Whamcloud - gitweb
LU-10337 mdt: Allow open of open orphans
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index 7f73595..1025a0d 100644 (file)
@@ -942,12 +942,12 @@ int mdd_changelog_data_store(const struct lu_env *env, struct mdd_device *mdd,
        RETURN(rc);
 }
 
-static int mdd_changelog_data_store_xattr(const struct lu_env *env,
-                                         struct mdd_device *mdd,
-                                         enum changelog_rec_type type,
-                                         int flags, struct mdd_object *mdd_obj,
-                                         const char *xattr_name,
-                                         struct thandle *handle)
+int mdd_changelog_data_store_xattr(const struct lu_env *env,
+                                  struct mdd_device *mdd,
+                                  enum changelog_rec_type type,
+                                  int flags, struct mdd_object *mdd_obj,
+                                  const char *xattr_name,
+                                  struct thandle *handle)
 {
        int                              rc;
 
@@ -1121,6 +1121,30 @@ static inline bool permission_needs_sync(const struct lu_attr *old,
        return false;
 }
 
+static inline __u64 mdd_lmm_dom_size(void *buf)
+{
+       struct lov_mds_md *lmm = buf;
+       struct lov_comp_md_v1 *comp_v1;
+       struct lov_mds_md *v1;
+       __u32 off;
+
+       if (lmm == NULL)
+               return 0;
+
+       if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
+               return 0;
+
+       comp_v1 = (struct lov_comp_md_v1 *)lmm;
+       off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
+       v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+       /* DoM entry is the first entry always */
+       if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) == LOV_PATTERN_MDT)
+               return le64_to_cpu(comp_v1->lcm_entries[0].lcme_extent.e_end);
+
+       return 0;
+}
+
 /* set attr and LOV EA at once, return updated attr */
 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                 const struct md_attr *ma)
@@ -1198,6 +1222,25 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                       la->la_mtime, la->la_ctime);
 
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+
+       /* LU-10509: setattr of LA_SIZE should be skipped case of DOM,
+        * otherwise following truncate will do nothing and truncated
+        * data may be read again. This is a quick fix until LU-11033
+        * will be resolved.
+        */
+       if (la_copy->la_valid & LA_SIZE) {
+               struct lu_buf *lov_buf = mdd_buf_get(env, NULL, 0);
+
+               rc = mdd_stripe_get(env, mdd_obj, lov_buf, XATTR_NAME_LOV);
+               if (rc) {
+                       rc = 0;
+               } else {
+                       if (mdd_lmm_dom_size(lov_buf->lb_buf) > 0)
+                               la_copy->la_valid &= ~LA_SIZE;
+                       lu_buf_free(lov_buf);
+               }
+       }
+
        if (la_copy->la_valid) {
                rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
 
@@ -1240,10 +1283,10 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
                    (uc->uc_fsuid != attr->la_uid) &&
                    !md_capable(uc, CFS_CAP_FOWNER))
                        RETURN(-EPERM);
-       } else {
-               if ((uc->uc_fsuid != attr->la_uid) &&
-                   !md_capable(uc, CFS_CAP_FOWNER))
-                       RETURN(-EPERM);
+       } else if (strcmp(name, XATTR_NAME_SOM) != 0 &&
+                  (uc->uc_fsuid != attr->la_uid) &&
+                  !md_capable(uc, CFS_CAP_FOWNER)) {
+               RETURN(-EPERM);
        }
 
        RETURN(0);
@@ -1282,6 +1325,10 @@ mdd_xattr_changelog_type(const struct lu_env *env, struct mdd_device *mdd,
        if (strcmp(XATTR_NAME_HSM, xattr_name) == 0)
                return CL_HSM;
 
+       /* Avoid logging SOM xattr for every file */
+       if (strcmp(XATTR_NAME_SOM, xattr_name) == 0)
+               return CL_NONE;
+
        if (has_prefix(xattr_name, XATTR_USER_PREFIX) ||
            has_prefix(xattr_name, XATTR_NAME_POSIX_ACL_ACCESS) ||
            has_prefix(xattr_name, XATTR_NAME_POSIX_ACL_DEFAULT) ||
@@ -1289,7 +1336,7 @@ mdd_xattr_changelog_type(const struct lu_env *env, struct mdd_device *mdd,
            has_prefix(xattr_name, XATTR_SECURITY_PREFIX))
                return CL_SETXATTR;
 
-       return -1;
+       return CL_NONE;
 }
 
 static int mdd_declare_xattr_set(const struct lu_env *env,
@@ -1445,7 +1492,7 @@ static int mdd_xattr_merge(const struct lu_env *env, struct md_object *md_obj,
 
        /* get EA of victim file */
        memset(buf_vic, 0, sizeof(*buf_vic));
-       rc = mdd_get_lov_ea(env, vic, buf_vic);
+       rc = mdd_stripe_get(env, vic, buf_vic, XATTR_NAME_LOV);
        if (rc < 0) {
                if (rc == -ENODATA)
                        rc = 0;
@@ -1459,7 +1506,7 @@ static int mdd_xattr_merge(const struct lu_env *env, struct md_object *md_obj,
 
        /* save EA of target file for restore */
        memset(buf, 0, sizeof(*buf));
-       rc = mdd_get_lov_ea(env, obj, buf);
+       rc = mdd_stripe_get(env, obj, buf, XATTR_NAME_LOV);
        if (rc < 0)
                GOTO(out, rc);
 
@@ -1671,7 +1718,7 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
 
        /* get EA of mirrored file */
        memset(buf_save, 0, sizeof(*buf));
-       rc = mdd_get_lov_ea(env, obj, buf_save);
+       rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
        if (rc < 0)
                GOTO(out, rc);
 
@@ -1806,6 +1853,11 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                RETURN(rc);
        }
 
+       if (strcmp(name, XATTR_NAME_LMV) == 0) {
+               rc = mdd_dir_layout_shrink(env, obj, buf);
+               RETURN(rc);
+       }
+
        if (strcmp(name, XATTR_NAME_ACL_ACCESS) == 0 ||
            strcmp(name, XATTR_NAME_ACL_DEFAULT) == 0) {
                struct posix_acl *acl;
@@ -1939,58 +1991,50 @@ stop:
 }
 
 /*
- * read lov EA of an object
- * return the lov EA in an allocated lu_buf
+ * read lov/lmv EA of an object
+ * return the lov/lmv EA in an allocated lu_buf
  */
-int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
-                  struct lu_buf *lmm_buf)
+int mdd_stripe_get(const struct lu_env *env, struct mdd_object *obj,
+                  struct lu_buf *lmm_buf, const char *name)
 {
-       struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
-       int              rc, bufsize;
+       struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
+       int rc;
+
        ENTRY;
 
-repeat:
-       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV);
+       if (buf->lb_buf == NULL) {
+               buf = lu_buf_check_and_alloc(buf, 4096);
+               if (buf->lb_buf == NULL)
+                       RETURN(-ENOMEM);
+       }
 
+repeat:
+       rc = mdo_xattr_get(env, obj, buf, name);
        if (rc == -ERANGE) {
                /* mti_big_buf is allocated but is too small
                 * we need to increase it */
                buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
                                             buf->lb_len * 2);
                if (buf->lb_buf == NULL)
-                       GOTO(out, rc = -ENOMEM);
+                       RETURN(-ENOMEM);
                goto repeat;
-       }
-
-       if (rc < 0)
+       } else if (rc < 0) {
                RETURN(rc);
-
-       if (rc == 0)
+       } else if (rc == 0) {
                RETURN(-ENODATA);
-
-       bufsize = rc;
-       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
-               /* mti_big_buf was not allocated, so we have to
-                * allocate it based on the ea size */
-               buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
-                                            bufsize);
-               if (buf->lb_buf == NULL)
-                       GOTO(out, rc = -ENOMEM);
-               goto repeat;
        }
 
-       lu_buf_alloc(lmm_buf, bufsize);
+       lu_buf_alloc(lmm_buf, rc);
        if (lmm_buf->lb_buf == NULL)
-               GOTO(out, rc = -ENOMEM);
+               RETURN(-ENOMEM);
 
-       memcpy(lmm_buf->lb_buf, buf->lb_buf, bufsize);
-       rc = 0;
-       EXIT;
+       /*
+        * we don't use lmm_buf directly, because we don't know xattr size, so
+        * by using mti_big_buf we can avoid calling mdo_xattr_get() twice.
+        */
+       memcpy(lmm_buf->lb_buf, buf->lb_buf, rc);
 
-out:
-       if (rc < 0)
-               lu_buf_free(lmm_buf);
-       return rc;
+       RETURN(0);
 }
 
 static int mdd_xattr_hsm_replace(const struct lu_env *env,
@@ -2151,28 +2195,38 @@ static inline int mdd_set_lmm_gen(struct lov_mds_md *lmm, __u32 *gen)
        return mdd_lmm_gen(lmm, gen, false);
 }
 
-static inline __u64 mdd_lmm_dom_size(void *buf)
+static int mdd_dom_data_truncate(const struct lu_env *env,
+                                struct mdd_device *mdd, struct mdd_object *mo)
 {
-       struct lov_mds_md *lmm = buf;
-       struct lov_comp_md_v1 *comp_v1;
-       struct lov_mds_md *v1;
-       __u32 off;
+       struct thandle *th;
+       struct dt_object *dom;
+       int rc;
 
-       if (lmm == NULL)
-               return 0;
+       dom = dt_object_locate(mdd_object_child(mo), mdd->mdd_bottom);
+       if (!dom)
+               GOTO(out, rc = -ENODATA);
 
-       if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
-               return 0;
+       th = dt_trans_create(env, mdd->mdd_bottom);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
 
-       comp_v1 = (struct lov_comp_md_v1 *)lmm;
-       off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
-       v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+       rc = dt_declare_punch(env, dom, 0, OBD_OBJECT_EOF, th);
+       if (rc)
+               GOTO(stop, rc);
 
-       /* DoM entry is the first entry always */
-       if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) == LOV_PATTERN_MDT)
-               return le64_to_cpu(comp_v1->lcm_entries[0].lcme_extent.e_end);
+       rc = dt_trans_start_local(env, mdd->mdd_bottom, th);
+       if (rc != 0)
+               GOTO(stop, rc);
 
-       return 0;
+       rc = dt_punch(env, dom, 0, OBD_OBJECT_EOF, th);
+stop:
+       dt_trans_stop(env, mdd->mdd_bottom, th);
+out:
+       /* Ignore failure but report the error */
+       if (rc)
+               CERROR("%s: "DFID" can't truncate DOM inode data, rc = %d\n",
+                      mdd_obj_dev_name(mo), PFID(mdo2fid(mo)), rc);
+       return rc;
 }
 
 /**
@@ -2181,23 +2235,25 @@ static inline __u64 mdd_lmm_dom_size(void *buf)
 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                            struct md_object *obj2, __u64 flags)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct mdd_object       *fst_o = md2mdd_obj(obj1);
-       struct mdd_object       *snd_o = md2mdd_obj(obj2);
-       struct lu_attr          *fst_la = MDD_ENV_VAR(env, cattr);
-       struct lu_attr          *snd_la = MDD_ENV_VAR(env, tattr);
-       struct mdd_device       *mdd = mdo2mdd(obj1);
-       struct lov_mds_md       *fst_lmm, *snd_lmm;
-       struct lu_buf           *fst_buf = &info->mti_buf[0];
-       struct lu_buf           *snd_buf = &info->mti_buf[1];
-       struct lu_buf           *fst_hsm_buf = &info->mti_buf[2];
-       struct lu_buf           *snd_hsm_buf = &info->mti_buf[3];
-       struct ost_id           *saved_oi = NULL;
-       struct thandle          *handle;
-       __u32                    fst_gen, snd_gen, saved_gen;
-       int                      fst_fl;
-       int                      rc;
-       int                      rc2;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_object *fst_o = md2mdd_obj(obj1);
+       struct mdd_object *snd_o = md2mdd_obj(obj2);
+       struct lu_attr *fst_la = MDD_ENV_VAR(env, cattr);
+       struct lu_attr *snd_la = MDD_ENV_VAR(env, tattr);
+       struct mdd_device *mdd = mdo2mdd(obj1);
+       struct lov_mds_md *fst_lmm, *snd_lmm;
+       struct lu_buf *fst_buf = &info->mti_buf[0];
+       struct lu_buf *snd_buf = &info->mti_buf[1];
+       struct lu_buf *fst_hsm_buf = &info->mti_buf[2];
+       struct lu_buf *snd_hsm_buf = &info->mti_buf[3];
+       struct ost_id *saved_oi = NULL;
+       struct thandle *handle;
+       struct mdd_object *dom_o = NULL;
+       __u64 domsize_dom, domsize_vlt;
+       __u32 fst_gen, snd_gen, saved_gen;
+       int fst_fl;
+       int rc, rc2;
+
        ENTRY;
 
        CLASSERT(ARRAY_SIZE(info->mti_buf) >= 4);
@@ -2233,24 +2289,57 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
        mdd_write_lock(env, fst_o, MOR_TGT_CHILD);
        mdd_write_lock(env, snd_o, MOR_TGT_CHILD);
 
-       rc = mdd_get_lov_ea(env, fst_o, fst_buf);
+       rc = mdd_stripe_get(env, fst_o, fst_buf, XATTR_NAME_LOV);
        if (rc < 0 && rc != -ENODATA)
                GOTO(stop, rc);
 
-       rc = mdd_get_lov_ea(env, snd_o, snd_buf);
+       rc = mdd_stripe_get(env, snd_o, snd_buf, XATTR_NAME_LOV);
        if (rc < 0 && rc != -ENODATA)
                GOTO(stop, rc);
 
-       /* check if file is DoM, it can be migrated only to another DoM layout
-        * with the same DoM component size
+       /* check if file has DoM. DoM file can be migrated only to another
+        * DoM layout with the same DoM component size or to an non-DOM
+        * layout. After migration to OSTs layout, local MDT inode data
+        * should be truncated.
+        * Objects are sorted by FIDs, considering that original file's FID
+        * is always smaller the snd_o is always original file we are migrating
+        * from.
         */
-       if (mdd_lmm_dom_size(fst_buf->lb_buf) !=
-           mdd_lmm_dom_size(snd_buf->lb_buf)) {
+       domsize_dom = mdd_lmm_dom_size(snd_buf->lb_buf);
+       domsize_vlt = mdd_lmm_dom_size(fst_buf->lb_buf);
+
+       /* Only migration is supported for DoM files, not 'swap_layouts' so
+        * target file must be volatile and orphan.
+        */
+       if (fst_o->mod_flags & (ORPHAN_OBJ | VOLATILE_OBJ)) {
+               dom_o = domsize_dom ? snd_o : NULL;
+       } else if (snd_o->mod_flags & (ORPHAN_OBJ | VOLATILE_OBJ)) {
+               swap(domsize_dom, domsize_vlt);
+               dom_o = domsize_dom ? fst_o : NULL;
+       } else if (domsize_dom > 0 || domsize_vlt > 0) {
+               /* 'lfs swap_layouts' case, neither file should have DoM */
+               rc = -EOPNOTSUPP;
+               CDEBUG(D_LAYOUT, "cannot swap layouts with DOM component, "
+                      "use migration instead: rc = %d\n", rc);
+               GOTO(stop, rc);
+       }
+
+       if (domsize_vlt > 0 && domsize_dom == 0) {
+               rc = -EOPNOTSUPP;
+               CDEBUG(D_LAYOUT, "cannot swap layout for "DFID": OST to DOM "
+                      "migration is not supported: rc = %d\n",
+                      PFID(mdo2fid(snd_o)), rc);
+               GOTO(stop, rc);
+       } else if (domsize_vlt > 0 && domsize_dom != domsize_vlt) {
                rc = -EOPNOTSUPP;
                CDEBUG(D_LAYOUT, "cannot swap layout for "DFID": new layout "
-                      "must have the same DoM component: rc = %d\n",
+                      "must have the same DoM component size: rc = %d\n",
                       PFID(mdo2fid(fst_o)), rc);
                GOTO(stop, rc);
+       } else if (domsize_vlt > 0) {
+               /* Migration with the same DOM component size, no need to
+                * truncate local data, it is still being used */
+               dom_o = NULL;
        }
 
        /* swapping 2 non existant layouts is a success */
@@ -2455,6 +2544,10 @@ out_restore:
 stop:
        rc = mdd_trans_stop(env, mdd, rc, handle);
 
+       /* Truncate local DOM data if all went well */
+       if (!rc && dom_o)
+               mdd_dom_data_truncate(env, mdd, dom_o);
+
        mdd_write_unlock(env, snd_o);
        mdd_write_unlock(env, fst_o);
 
@@ -2536,6 +2629,9 @@ mdd_layout_update_rdonly(const struct lu_env *env, struct mdd_object *obj,
                         struct md_layout_change *mlc, struct thandle *handle)
 {
        struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
+       struct lu_buf *som_buf = &mdd_env_info(env)->mti_buf[1];
+       struct lustre_som_attrs *som = &mlc->mlc_som;
+       int fl = 0;
        int rc;
        ENTRY;
 
@@ -2551,13 +2647,28 @@ mdd_layout_update_rdonly(const struct lu_env *env, struct mdd_object *obj,
                RETURN(0);
        }
 
+       som_buf->lb_buf = som;
+       som_buf->lb_len = sizeof(*som);
+       rc = mdo_xattr_get(env, obj, som_buf, XATTR_NAME_SOM);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       if (rc > 0) {
+               lustre_som_swab(som);
+               if (som->lsa_valid & SOM_FL_STRICT)
+                       fl = LU_XATTR_REPLACE;
+       }
+
        rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
        if (rc)
                GOTO(out, rc);
 
-       rc = mdd_declare_xattr_del(env, mdd, obj, XATTR_NAME_SOM, handle);
-       if (rc)
-               GOTO(out, rc);
+       if (fl) {
+               rc = mdd_declare_xattr_set(env, mdd, obj, som_buf,
+                                          XATTR_NAME_SOM, fl, handle);
+               if (rc)
+                       GOTO(out, rc);
+       }
 
        /* record a changelog for data mover to consume */
        rc = mdd_declare_changelog_store(env, mdd, CL_FLRW, NULL, NULL, handle);
@@ -2573,10 +2684,12 @@ mdd_layout_update_rdonly(const struct lu_env *env, struct mdd_object *obj,
 
        mdd_write_lock(env, obj, MOR_TGT_CHILD);
        rc = mdo_layout_change(env, obj, mlc, handle);
-       if (!rc) {
-               rc = mdo_xattr_del(env, obj, XATTR_NAME_SOM, handle);
-               if (rc == -ENODATA)
-                       rc = 0;
+       if (!rc && fl) {
+               /* SOM state transition from STRICT to STALE */
+               som->lsa_valid = SOM_FL_STALE;
+               lustre_som_swab(som);
+               rc = mdo_xattr_set(env, obj, som_buf, XATTR_NAME_SOM,
+                                  fl, handle);
        }
        mdd_write_unlock(env, obj);
        if (rc)
@@ -2685,12 +2798,13 @@ mdd_object_update_sync_pending(const struct lu_env *env, struct mdd_object *obj,
                RETURN(-EBUSY);
        }
 
-       if (mlc->mlc_som.lsa_valid & LSOM_FL_VALID) {
+       if (mlc->mlc_som.lsa_valid & SOM_FL_STRICT) {
                rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_SOM);
-               if (rc && rc != -ENODATA)
+               if (rc < 0 && rc != -ENODATA)
                        RETURN(rc);
 
                fl = rc == -ENODATA ? LU_XATTR_CREATE : LU_XATTR_REPLACE;
+               lustre_som_swab(&mlc->mlc_som);
                som_buf->lb_buf = &mlc->mlc_som;
                som_buf->lb_len = sizeof(mlc->mlc_som);
        }
@@ -2772,7 +2886,7 @@ mdd_layout_change(const struct lu_env *env, struct md_object *o,
        if (IS_ERR(handle))
                RETURN(PTR_ERR(handle));
 
-       rc = mdd_get_lov_ea(env, obj, buf);
+       rc = mdd_stripe_get(env, obj, buf, XATTR_NAME_LOV);
        if (rc < 0) {
                if (rc == -ENODATA)
                        rc = -EINVAL;
@@ -2874,8 +2988,11 @@ static int mdd_open_sanity_check(const struct lu_env *env,
        int mode, rc;
        ENTRY;
 
-       /* EEXIST check */
-       if (mdd_is_dead_obj(obj))
+       /* EEXIST check, also opening of *open* orphans is allowed so we can
+        * open-by-handle unlinked files
+        */
+       if (mdd_is_dead_obj(obj) &&
+           likely(!(mdd_is_orphan_obj(obj) && obj->mod_count > 0)))
                RETURN(-ENOENT);
 
        if (S_ISLNK(attr->la_mode))