Whamcloud - gitweb
LU-10499 pcc: use foreign layout for PCCRO on server side 75/51375/3
authorQian Yingjin <qian@ddn.com>
Mon, 11 Sep 2023 15:53:31 +0000 (11:53 -0400)
committerOleg Drokin <green@whamcloud.com>
Sat, 23 Sep 2023 06:02:02 +0000 (06:02 +0000)
This patch includes the codes about using foreign layout for PCCRO
on the server side (LOD|MDD|MDT layers).

Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I48467be9fef54bd05432528b685241aa53978d24
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/51375
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
14 files changed:
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/lfsck/lfsck_layout.c
lustre/llite/llite_internal.h
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/lod/lod_qos.c
lustre/lov/lov_ea.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_open.c
lustre/ptlrpc/pack_generic.c
lustre/utils/wirecheck.c

index 6157643..044361a 100644 (file)
@@ -179,6 +179,26 @@ struct hsm_attrs {
 };
 extern void lustre_hsm_swab(struct hsm_attrs *attrs);
 
+static inline void lov_foreign_hsm_to_cpu(struct lov_hsm_base *dst,
+                                         const struct lov_foreign_md *lfm)
+{
+       struct lov_hsm_base *src = (struct lov_hsm_base *)lfm->lfm_value;
+
+       dst->lhb_archive_id = __le64_to_cpu(src->lhb_archive_id);
+       dst->lhb_archive_ver = __le64_to_cpu(src->lhb_archive_ver);
+       memcpy(dst->lhb_uuid, src->lhb_uuid, sizeof(dst->lhb_uuid));
+}
+
+static inline void lov_foreign_hsm_to_le(struct lov_foreign_md *lfm,
+                                        struct lov_hsm_base *src)
+{
+       struct lov_hsm_base *dst = (struct lov_hsm_base *)lfm->lfm_value;
+
+       dst->lhb_archive_id = __cpu_to_le64(dst->lhb_archive_id);
+       dst->lhb_archive_ver = __cpu_to_le64(dst->lhb_archive_ver);
+       memcpy(dst->lhb_uuid, src->lhb_uuid, sizeof(dst->lhb_uuid));
+}
+
 /**
  * fid constants
  */
index c782aca..3619ae5 100644 (file)
@@ -744,7 +744,7 @@ static inline bool lov_pattern_supported(__u32 pattern)
 
 /* RELEASED and MDT patterns are not valid in many places, so rather than
  * having many extra checks on lov_pattern_supported, we have this separate
- * check for non-released, non-DOM components
+ * check for non-released, non-readonly, non-DOM components
  */
 static inline bool lov_pattern_supported_normal_comp(__u32 pattern)
 {
@@ -852,10 +852,10 @@ struct lov_foreign_md {
        char lfm_value[];
 } __attribute__((packed));
 
-#define foreign_size(lfm) (((struct lov_foreign_md *)lfm)->lfm_length + \
+#define lov_foreign_size(lfm) (((struct lov_foreign_md *)lfm)->lfm_length + \
                           offsetof(struct lov_foreign_md, lfm_value))
 
-#define foreign_size_le(lfm) \
+#define lov_foreign_size_le(lfm) \
        (le32_to_cpu(((struct lov_foreign_md *)lfm)->lfm_length) + \
        offsetof(struct lov_foreign_md, lfm_value))
 
@@ -1037,6 +1037,11 @@ static inline __u32 lov_user_md_size(__u16 stripes, __u32 lmm_magic)
                                stripes * sizeof(struct lov_user_ost_data_v1);
 }
 
+static inline __u32 lov_foreign_md_size(__u32 length)
+{
+       return length + offsetof(struct lov_foreign_md, lfm_value);
+}
+
 /* Compile with -D_LARGEFILE64_SOURCE or -D_GNU_SOURCE (or #define) to
  * use this.  It is unsafe to #define those values in this header as it
  * is possible the application has already #included <sys/stat.h>. */
@@ -1163,7 +1168,7 @@ struct lustre_foreign_type {
  * LOV/LMV foreign types
  **/
 enum lustre_foreign_types {
-       LU_FOREIGN_TYPE_NONE = 0,
+       LU_FOREIGN_TYPE_NONE    = 0,
        /* HSM copytool lhsm_posix */
        LU_FOREIGN_TYPE_POSIX   = 1,
        /* Used for PCC-RW. PCCRW components are local to a single archive. */
@@ -2286,7 +2291,56 @@ enum ioc_data_version_flags {
 
 /********* HSM **********/
 
-/** HSM per-file state
+#define UUID_MAX       40
+
+struct lov_hsm_base {
+       /* HSM archive ID */
+       __u64   lhb_archive_id;
+       /* Data version associated with the last archiving, if any. */
+       __u64   lhb_archive_ver;
+       /* Identifier within HSM backend */
+       char    lhb_uuid[UUID_MAX];
+};
+
+/**
+ * HSM layout is a kind of FOREIGN layout.
+ */
+struct lov_hsm_md {
+       /* LOV_MAGIC_FOREIGN */
+       __u32                   lhm_magic;
+       /* To make HSM layout compatible with lov_foreign_md, this @length
+        * includes everything after @lhm_flags: sizeof(lhm_archive_id) +
+        * sizeof(lhm_archive_ver) + lenght of lhm_archive_uuid.
+        */
+       __u32                   lhm_length;
+       /* HSM type, see LU_FOREIGN_TYPE_(POSIX, S3, PCCRW, PCCRO}. */
+       __u32                   lhm_type;
+       /* HSM flags, see enum hsm_states */
+       __u32                   lhm_flags;
+       /*
+        * Data structure members above are compatible with @lov_foreign_md.
+        * The following members are private to HSM layout.
+        */
+       struct lov_hsm_base     lhm_hsm;
+} __attribute__((packed));
+
+#define lhm_archive_id         lhm_hsm.lhb_archive_id
+#define lhm_archive_ver                lhm_hsm.lhb_archive_ver
+#define lhm_archive_uuid       lhm_hsm.lhb_uuid
+
+static inline bool lov_hsm_type_supported(__u32 type)
+{
+       return type == LU_FOREIGN_TYPE_POSIX || type == LU_FOREIGN_TYPE_PCCRW ||
+              type == LU_FOREIGN_TYPE_PCCRO || type == LU_FOREIGN_TYPE_S3;
+}
+
+static inline bool lov_foreign_type_supported(__u32 type)
+{
+       return lov_hsm_type_supported(type) || type == LU_FOREIGN_TYPE_SYMLINK;
+}
+
+/**
+ * HSM per-file state
  * See HSM_FLAGS below.
  */
 enum hsm_states {
@@ -2306,7 +2360,8 @@ enum hsm_states {
 #define HSM_USER_MASK   (HS_NORELEASE | HS_NOARCHIVE | HS_DIRTY)
 
 /* Other HSM flags. */
-#define HSM_STATUS_MASK (HS_EXISTS | HS_LOST | HS_RELEASED | HS_ARCHIVED)
+#define HSM_STATUS_MASK (HS_EXISTS | HS_LOST | HS_RELEASED | HS_ARCHIVED | \
+                        HS_PCCRW | HS_PCCRO)
 
 /*
  * All HSM-related possible flags that could be applied to a file.
@@ -2829,7 +2884,7 @@ static inline const char *pcc_type2string(enum lu_pcc_type type)
 
 struct lu_pcc_attach {
        __u32 pcca_type; /* PCC type */
-       __u32 pcca_id; /* archive ID for readwrite, group ID for readonly */
+       __u32 pcca_id; /* Attach ID */
 };
 
 enum lu_pcc_detach_opts {
index e31817a..e786eb3 100644 (file)
@@ -440,6 +440,7 @@ static int lfsck_layout_verify_header(struct dt_object *obj,
                        __u64 start = le64_to_cpu(lcme->lcme_extent.e_start);
                        __u64 end = le64_to_cpu(lcme->lcme_extent.e_end);
                        __u32 comp_id = le32_to_cpu(lcme->lcme_id);
+                       struct lov_mds_md_v1 *v1;
                        bool ext, inited, zero;
                        __u32 flags;
 
@@ -492,10 +493,15 @@ static int lfsck_layout_verify_header(struct dt_object *obj,
                                return -EINVAL;
                        }
 
-                       rc = lfsck_layout_verify_header_v1v3(obj,
-                                       (struct lov_mds_md_v1 *)((char *)lmm +
-                                       le32_to_cpu(lcme->lcme_offset)), start,
-                                       end, comp_id, ext, &p_dom);
+                       v1 = (struct lov_mds_md_v1 *)((char *)lmm +
+                                               le32_to_cpu(lcme->lcme_offset));
+                       if (le32_to_cpu(v1->lmm_magic) == LOV_MAGIC_FOREIGN)
+                               rc = lfsck_layout_verify_header_foreign(
+                                       obj, (struct lov_foreign_md *)v1,
+                                       le32_to_cpu(lcme->lcme_size));
+                       else
+                               rc = lfsck_layout_verify_header_v1v3(obj, v1,
+                                       start, end, comp_id, ext, &p_dom);
 
                        p_zero = zero;
                }
index a168317..60887a7 100644 (file)
@@ -1433,7 +1433,7 @@ static inline ssize_t ll_lov_user_md_size(const struct lov_user_md *lum)
        case LOV_USER_MAGIC_COMP_V1:
                return ((struct lov_comp_md_v1 *)lum)->lcm_size;
        case LOV_USER_MAGIC_FOREIGN:
-               return foreign_size(lum);
+               return lov_foreign_size(lum);
        }
 
        return -EINVAL;
index e158a25..9c308d6 100644 (file)
@@ -177,18 +177,34 @@ struct lod_layout_component {
        struct lu_extent          llc_extent;
        __u32                     llc_id;
        __u32                     llc_flags;
-       __u32                     llc_stripe_size;
-       __u32                     llc_pattern;
-       __u16                     llc_layout_gen;
-       __u16                     llc_stripe_offset;
-       __u16                     llc_stripe_count;
-       __u16                     llc_stripes_allocated;
+       __u32                     llc_magic;
        __u64                     llc_timestamp; /* snapshot time */
-       char                     *llc_pool;
-       /* ost list specified with LOV_USER_MAGIC_SPECIFIC lum */
-       struct lu_tgt_pool        llc_ostlist;
-       struct dt_object        **llc_stripe;
-       __u32                    *llc_ost_indices;
+       union {
+               struct { /* plain layout V1/V3. */
+                       __u32                     llc_pattern;
+                       __u32                     llc_stripe_size;
+                       __u16                     llc_layout_gen;
+                       __u16                     llc_stripe_offset;
+                       __u16                     llc_stripe_count;
+                       __u16                     llc_stripes_allocated;
+                       char                     *llc_pool;
+                       /* ost list specified by LOV_USER_MAGIC_SPECIFIC lum */
+                       struct lu_tgt_pool        llc_ostlist;
+                       struct dt_object        **llc_stripe;
+                       __u32                    *llc_ost_indices;
+               };
+               struct { /* Foreign mirror layout component */
+                       __u32                     llc_length;
+                       __u32                     llc_type;
+                       __u32                     llc_foreign_flags;
+                       union {
+                               /* Basic HSM layout information */
+                               struct lov_hsm_base      llc_hsm;
+                               /* Other kinds of foreign types (i.e. DAOS) */
+                               char                    *llc_value;
+                       };
+               };
+       };
 };
 
 struct lod_default_striping {
@@ -218,7 +234,8 @@ enum layout_verify_flags {
 
 struct lod_mirror_entry {
        __u16   lme_stale:1,
-               lme_prefer:1;
+               lme_prefer:1,
+               lme_hsm:1;
        /* mirror id */
        __u16   lme_id;
        /* preference */
@@ -310,6 +327,12 @@ static inline bool lod_is_flr(const struct lod_object *lo)
        return (lo->ldo_flr_state & LCM_FL_FLR_MASK) != LCM_FL_NONE;
 }
 
+static inline bool lod_is_hsm(const struct lod_layout_component *lod_comp)
+{
+       return lod_comp->llc_magic == LOV_MAGIC_FOREIGN &&
+              lov_hsm_type_supported(lod_comp->llc_type);
+}
+
 static inline bool lod_is_splitting(const struct lod_object *lo)
 {
        return lmv_hash_is_splitting(lo->ldo_dir_hash_type);
@@ -498,6 +521,9 @@ static inline bool lod_obj_is_striped(struct dt_object *dt)
                rc = false;
        } else {
                for (i = 0; i < lo->ldo_comp_cnt; i++) {
+                       if (lo->ldo_comp_entries[i].llc_magic ==
+                           LOV_MAGIC_FOREIGN)
+                               continue;
                        if (lo->ldo_comp_entries[i].llc_stripe == NULL)
                                continue;
                        LASSERT(lo->ldo_comp_entries[i].llc_stripe_count > 0);
@@ -547,6 +573,8 @@ static inline void lod_layout_get_pool(struct lod_layout_component *entries,
        int i;
 
        for (i = 0; i < count; i++) {
+               if (entries[i].llc_magic == LOV_MAGIC_FOREIGN)
+                       continue;
                if (entries[i].llc_pool != NULL) {
                        strlcpy(pool, entries[i].llc_pool, len);
                        break;
@@ -668,6 +696,7 @@ void lod_free_def_comp_entries(struct lod_default_striping *lds);
 void lod_free_comp_entries(struct lod_object *lo);
 int lod_alloc_comp_entries(struct lod_object *lo, int mirror_cnt, int comp_cnt);
 int lod_fill_mirrors(struct lod_object *lo);
+int lod_init_comp_foreign(struct lod_layout_component *lod_comp, void *lmm);
 
 /* lod_pool.c */
 struct pool_desc *lod_find_pool(struct lod_device *lod, char *poolname);
index b8f1b74..101502f 100644 (file)
@@ -482,6 +482,8 @@ static void lod_free_comp_buffer(struct lod_layout_component *entries,
 
        for (i = 0; i < count; i++) {
                entry = &entries[i];
+               if (entry->llc_magic == LOV_MAGIC_FOREIGN)
+                       continue;
                if (entry->llc_pool != NULL)
                        lod_set_pool(&entry->llc_pool, NULL);
                if (entry->llc_ostlist.op_array)
@@ -612,6 +614,7 @@ int lod_fill_mirrors(struct lod_object *lo)
        for (i = 0; i < lo->ldo_comp_cnt; i++, lod_comp++) {
                bool stale = lod_comp->llc_flags & LCME_FL_STALE;
                bool preferred = lod_comp->llc_flags & LCME_FL_PREF_WR;
+               bool mirror_hsm = lod_is_hsm(lod_comp);
                bool init = (lod_comp->llc_stripe != NULL) &&
                            !(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) &&
                            !(lod_comp->llc_pattern & LOV_PATTERN_MDT);
@@ -647,6 +650,9 @@ int lod_fill_mirrors(struct lod_object *lo)
                }
 
                if (mirror_id_of(lod_comp->llc_id) == mirror_id) {
+                       /* Currently HSM mirror does not support PFL. */
+                       if (lo->ldo_mirrors[mirror_idx].lme_hsm)
+                               RETURN(-EINVAL);
                        lo->ldo_mirrors[mirror_idx].lme_stale |= stale;
                        lo->ldo_mirrors[mirror_idx].lme_prefer |= preferred;
                        lo->ldo_mirrors[mirror_idx].lme_preference += pref;
@@ -663,11 +669,16 @@ int lod_fill_mirrors(struct lod_object *lo)
                if (mirror_idx >= lo->ldo_mirror_count)
                        RETURN(-EINVAL);
 
+               if (mirror_hsm && (lod_comp->llc_extent.e_start != 0 ||
+                                  lod_comp->llc_extent.e_end != LUSTRE_EOF))
+                       RETURN(-EINVAL);
+
                mirror_id = mirror_id_of(lod_comp->llc_id);
 
                lo->ldo_mirrors[mirror_idx].lme_id = mirror_id;
                lo->ldo_mirrors[mirror_idx].lme_stale = stale;
                lo->ldo_mirrors[mirror_idx].lme_prefer = preferred;
+               lo->ldo_mirrors[mirror_idx].lme_hsm = mirror_hsm;
                lo->ldo_mirrors[mirror_idx].lme_preference = pref;
                lo->ldo_mirrors[mirror_idx].lme_start = i;
                lo->ldo_mirrors[mirror_idx].lme_end = i;
@@ -844,6 +855,37 @@ done:
 }
 
 /**
+ * Generate on-disk lov_hsm_md structure based on the information in
+ * the lod_object->ldo_comp_entries.
+ */
+static int lod_gen_component_ea_foreign(const struct lu_env *env,
+                                       struct lod_object *lo,
+                                       struct lod_layout_component *lod_comp,
+                                       void *lmm, int *lmm_size)
+{
+       struct lov_foreign_md *lfm = (struct lov_foreign_md *)lmm;
+
+       ENTRY;
+
+       lfm->lfm_magic = cpu_to_le32(LOV_MAGIC_FOREIGN);
+       lfm->lfm_length = cpu_to_le32(lod_comp->llc_length);
+       lfm->lfm_type = cpu_to_le32(lod_comp->llc_type);
+       lfm->lfm_flags = cpu_to_le32(lod_comp->llc_foreign_flags);
+
+       if (lov_hsm_type_supported(lod_comp->llc_type)) {
+               if (lod_comp->llc_length != sizeof(struct lov_hsm_base))
+                       return -EINVAL;
+
+               lov_foreign_hsm_to_le(lfm, &lod_comp->llc_hsm);
+       }
+
+       if (lmm_size)
+               *lmm_size = lov_foreign_md_size(lod_comp->llc_length);
+
+       RETURN(0);
+}
+
+/**
  * Generate on-disk lov_mds_md structure based on the information in
  * the lod_object->ldo_comp_entries.
  *
@@ -950,7 +992,18 @@ int lod_generate_lovea(const struct lu_env *env, struct lod_object *lo,
                lcme->lcme_offset = cpu_to_le32(offset);
 
                sub_md = (struct lov_mds_md *)((char *)lcm + offset);
-               rc = lod_gen_component_ea(env, lo, i, sub_md, &size, is_dir);
+               if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) {
+                       if (!lov_hsm_type_supported(lod_comp->llc_type)) {
+                               CDEBUG(D_LAYOUT, "Unknown HSM type: %u\n",
+                                      lod_comp->llc_type);
+                               GOTO(out, rc = -EINVAL);
+                       }
+                       rc = lod_gen_component_ea_foreign(env, lo, lod_comp,
+                                                         sub_md, &size);
+               } else {
+                       rc = lod_gen_component_ea(env, lo, i, sub_md,
+                                                 &size, is_dir);
+               }
                if (rc)
                        GOTO(out, rc);
                lcme->lcme_size = cpu_to_le32(size);
@@ -1163,6 +1216,43 @@ out:
        RETURN(rc);
 }
 
+int lod_init_comp_foreign(struct lod_layout_component *lod_comp, void *lmm)
+{
+       struct lov_foreign_md *lfm;
+
+       lfm = (struct lov_foreign_md *)lmm;
+       lod_comp->llc_length = le32_to_cpu(lfm->lfm_length);
+       lod_comp->llc_type = le32_to_cpu(lfm->lfm_type);
+
+       if (!lov_hsm_type_supported(lod_comp->llc_type)) {
+               CDEBUG(D_LAYOUT,
+                      "Unsupport HSM type: %u length: %u flags: %08X\n",
+                      lod_comp->llc_type, lod_comp->llc_length,
+                      le32_to_cpu(lfm->lfm_flags));
+               return -EINVAL;
+       }
+
+       /*
+        * Currently it only stores the file FID as the field @lhm_archive_uuid
+        * which is used to be the identifier within HSM backend for the archive
+        * copy.
+        * Thus the length of foreign layout value (HSM is a kind of foreign
+        * layout type) is: sizeof(lhm_archive_id) + sizeof(lhm_archive_ver) +
+        *                  UUID_MAX
+        * It should fix to support other kinds of identifier for different HSM
+        * solutions such as S3.
+        */
+       if (lod_comp->llc_length != sizeof(struct lov_hsm_base)) {
+               CDEBUG(D_LAYOUT, "Invalid HSM len: %u, should be %zu\n",
+                      lod_comp->llc_length, sizeof(struct lov_hsm_base));
+               return -EINVAL;
+       }
+
+       lod_comp->llc_foreign_flags = le32_to_cpu(lfm->lfm_flags);
+       lov_foreign_hsm_to_cpu(&lod_comp->llc_hsm, lfm);
+       return 0;
+}
+
 /**
  * Instantiate objects for striping.
  *
@@ -1327,7 +1417,16 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo,
                                      PFID(lod_object_fid(lo)),
                                      le32_to_cpu(comp_v1->lcm_magic));
                        }
+
+                       lod_comp->llc_magic = le32_to_cpu(lmm->lmm_magic);
+                       if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) {
+                               rc = lod_init_comp_foreign(lod_comp, lmm);
+                               if (rc)
+                                       GOTO(out, rc);
+                               continue;
+                       }
                } else {
+                       lod_comp->llc_magic = le32_to_cpu(lmm->lmm_magic);
                        lod_comp_set_init(lod_comp);
                }
 
@@ -2005,10 +2104,10 @@ int lod_verify_striping(const struct lu_env *env, struct lod_device *d,
                        RETURN(-EINVAL);
                }
 
-               if (foreign_size_le(lfm) > buf->lb_len) {
+               if (lov_foreign_size_le(lfm) > buf->lb_len) {
                        CDEBUG(D_LAYOUT,
                               "buf len %zu < this lov_foreign_md size (%zu)\n",
-                              buf->lb_len, foreign_size_le(lfm));
+                              buf->lb_len, lov_foreign_size_le(lfm));
                        RETURN(-EINVAL);
                }
                /* Don't do anything with foreign layouts */
@@ -2126,8 +2225,65 @@ recheck:
                tmp.lb_buf = (char *)comp_v1 + le32_to_cpu(ent->lcme_offset);
                tmp.lb_len = le32_to_cpu(ent->lcme_size);
 
-               /* Check DoM entry is always the first one */
                lum = tmp.lb_buf;
+               if (le32_to_cpu(lum->lmm_magic) == LOV_MAGIC_FOREIGN) {
+                       struct lov_foreign_md *lfm;
+                       struct lov_hsm_md *lhm;
+                       u32 hsmsize;
+                       u32 ftype;
+
+                       /*
+                        * Currently when the foreign layout is used as a basic
+                        * layout component, it only supports HSM foreign types:
+                        * LU_FOREIGN_TYPE_{POSIX, S3, PCCRW, PCCRO}.
+                        */
+                       lfm = (struct lov_foreign_md *)lum;
+                       ftype = le32_to_cpu(lfm->lfm_type);
+                       if (!lov_hsm_type_supported(ftype)) {
+                               CDEBUG(D_LAYOUT,
+                                      "Foreign type %#x is not HSM\n", ftype);
+                               RETURN(-EINVAL);
+                       }
+
+                       /* Current HSM component must cover [0, EOF]. */
+                       if (le64_to_cpu(ext->e_start) > 0) {
+                               CDEBUG(D_LAYOUT, "Invalid HSM component with %llu extent start\n",
+                                      le64_to_cpu(ext->e_start));
+                               RETURN(-EINVAL);
+                       }
+                       if (le64_to_cpu(ext->e_end) != LUSTRE_EOF) {
+                               CDEBUG(D_LAYOUT, "Invalid HSM component with %llu extent end\n",
+                                      le64_to_cpu(ext->e_end));
+                               RETURN(-EINVAL);
+                       }
+
+                       lhm = (struct lov_hsm_md *)lfm;
+                       if (le32_to_cpu(lhm->lhm_length) !=
+                           sizeof(struct lov_hsm_base)) {
+                               CDEBUG(D_LAYOUT,
+                                      "Invalid HSM component size %u != %u\n",
+                                      le32_to_cpu(ent->lcme_size), hsmsize);
+                               RETURN(-EINVAL);
+                       }
+
+                       hsmsize = lov_foreign_size_le(lhm);
+                       if (le32_to_cpu(ent->lcme_size) < hsmsize) {
+                               CDEBUG(D_LAYOUT,
+                                      "Invalid HSM component size %u != %u\n",
+                                      le32_to_cpu(ent->lcme_size), hsmsize);
+                               RETURN(-EINVAL);
+                       }
+                       if (le32_to_cpu(lhm->lhm_flags) & ~HSM_FLAGS_MASK ||
+                           !(le32_to_cpu(lhm->lhm_flags) & HSM_FLAGS_MASK)) {
+                               CDEBUG(D_LAYOUT,
+                                      "Invalid HSM component flags %#x\n",
+                                      le32_to_cpu(lhm->lhm_flags));
+                               RETURN(-EINVAL);
+                       }
+                       continue;
+               }
+
+               /* Check DoM entry is always the first one */
                if (lov_pattern(le32_to_cpu(lum->lmm_pattern)) &
                    LOV_PATTERN_MDT) {
                        /* DoM component must be the first in a mirror */
index b508c25..95f6984 100644 (file)
@@ -1183,6 +1183,9 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
        for (i = 0; i < lo->ldo_comp_cnt; i++) {
                lod_comp = &lo->ldo_comp_entries[i];
 
+               if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN)
+                       continue;
+
                if (lod_comp->llc_stripe == NULL)
                        continue;
 
@@ -2698,13 +2701,19 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
        for (i = 0; i < comp_cnt; i++) {
                __u16 stripe_count;
 
-               magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
-               stripe_count = lod_comp_entry_stripe_count(lo, i, is_dir);
-               if (!is_dir && is_composite)
-                       lod_comp_shrink_stripe_count(&comp_entries[i],
-                                                    &stripe_count);
-
-               size += lov_user_md_size(stripe_count, magic);
+               if (comp_entries[i].llc_magic == LOV_MAGIC_FOREIGN) {
+                       size += lov_foreign_md_size(comp_entries[i].llc_length);
+               } else {
+                       magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 :
+                                                          LOV_MAGIC_V1;
+                       stripe_count = lod_comp_entry_stripe_count(lo, i,
+                                                                  is_dir);
+                       if (!is_dir && is_composite)
+                               lod_comp_shrink_stripe_count(&comp_entries[i],
+                                                            &stripe_count);
+
+                       size += lov_user_md_size(stripe_count, magic);
+               }
                LASSERT(size % sizeof(__u64) == 0);
        }
        return size;
@@ -3298,8 +3307,9 @@ out:
  * Merge layouts to form a mirrored file.
  */
 static int lod_declare_layout_merge(const struct lu_env *env,
-               struct dt_object *dt, const struct lu_buf *mbuf,
-               struct thandle *th)
+                                   struct dt_object *dt,
+                                   const struct lu_buf *mbuf,
+                                   struct thandle *th)
 {
        struct lod_thread_info *info = lod_env_info(env);
        struct lu_attr *layout_attr = &info->lti_layout_attr;
@@ -3464,7 +3474,7 @@ static int lod_declare_layout_merge(const struct lu_env *env,
        }
 
        rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), buf,
-                                       XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
+                                      XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
 
 out:
        lu_buf_free(buf);
@@ -4834,6 +4844,9 @@ static int lod_layout_del_prep_layout(const struct lu_env *env,
                        continue;
                }
 
+               if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN)
+                       continue;
+
                lod_obj_set_pool(lo, i, NULL);
                if (lod_comp->llc_ostlist.op_array) {
                        OBD_FREE(lod_comp->llc_ostlist.op_array,
@@ -6363,6 +6376,11 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt,
                if (lod_comp_inited(lod_comp))
                        continue;
 
+               if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) {
+                       lod_comp_set_init(lod_comp);
+                       continue;
+               }
+
                if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
                        lod_comp_set_init(lod_comp);
 
@@ -7438,6 +7456,9 @@ out:
 /* If striping is already instantiated or INIT'ed DOM? */
 static bool lod_is_instantiation_needed(struct lod_layout_component *comp)
 {
+       if (comp->llc_magic == LOV_MAGIC_FOREIGN)
+               return false;
+
        return !(((lov_pattern(comp->llc_pattern) & LOV_PATTERN_MDT) &&
                  lod_comp_inited(comp)) || comp->llc_stripe);
 }
@@ -7652,6 +7673,8 @@ restart:
 
                                lod_comp->llc_flags |= LCME_FL_STALE;
                                lo->ldo_mirrors[i].lme_stale = 1;
+                               if (lod_is_hsm(lod_comp))
+                                       lod_comp->llc_foreign_flags |= HS_DIRTY;
                        }
                }
        }
@@ -7876,6 +7899,254 @@ static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo,
        return need_sync ? 0 : -EALREADY;
 }
 
+static struct lod_layout_component *
+lod_locate_comp_hsm(struct lod_object *lo, int *hsm_mirror_id)
+{
+       struct lod_layout_component *lod_comp = NULL;
+       int i;
+
+       if (!lo->ldo_is_composite)
+               return NULL;
+
+       for (i = 0; i < lo->ldo_mirror_count; i++) {
+               /*
+                * FIXME: In the current design, there is only one HSM
+                * mirror component in range [0, EOF] for a FLR file. This
+                * should be fixed to support multiple HSM mirror components
+                * with different HSM backend types and partial file ranges
+                * in the future.
+                */
+               if (lo->ldo_mirrors[i].lme_hsm) {
+                       __u16 start_idx;
+                       __u16 end_idx;
+
+                       if (hsm_mirror_id)
+                               *hsm_mirror_id = i;
+                       start_idx = lo->ldo_mirrors[i].lme_start;
+                       end_idx = lo->ldo_mirrors[i].lme_end;
+                       LASSERT(start_idx == end_idx);
+                       lod_comp = &lo->ldo_comp_entries[start_idx];
+                       LASSERT(lo->ldo_is_composite && lod_is_hsm(lod_comp) &&
+                               lod_comp->llc_extent.e_start == 0 &&
+                               lod_comp->llc_extent.e_end == LUSTRE_EOF);
+                       break;
+               }
+       }
+
+       return lod_comp;
+}
+
+static int lod_declare_pccro_set(const struct lu_env *env,
+                                struct dt_object *dt, struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lu_buf *buf = &info->lti_buf;
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_layout_component *lod_comp;
+       struct lod_layout_component *comp_array;
+       struct lod_mirror_entry *mirror_array;
+       __u16 mirror_id;
+       int hsm_mirror_id;
+       int mirror_cnt;
+       int new_cnt;
+       int rc;
+       int i;
+
+       ENTRY;
+
+       rc = lod_striping_load(env, lo);
+       if (rc)
+               RETURN(rc);
+
+       if (lo->ldo_flr_state & LCM_FL_PCC_RDONLY)
+               RETURN(-EALREADY);
+
+       rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+       if (rc)
+               RETURN(rc);
+
+       lod_comp = lod_locate_comp_hsm(lo, &hsm_mirror_id);
+       if (lod_comp) {
+               if (lod_comp->llc_foreign_flags & HS_PCCRO) {
+                       CDEBUG(D_LAYOUT, "bad HSM flags: %#x\n",
+                              lod_comp->llc_foreign_flags);
+                       RETURN(-EINVAL);
+               }
+
+               lod_obj_inc_layout_gen(lo);
+               lod_comp->llc_foreign_flags |= HS_PCCRO;
+               lod_comp->llc_foreign_flags &= ~HS_DIRTY;
+               lod_comp->llc_flags &= ~LCME_FL_STALE;
+               lo->ldo_mirrors[hsm_mirror_id].lme_stale = 0;
+               lo->ldo_flr_state |= LCM_FL_PCC_RDONLY;
+               buf->lb_len = lod_comp_md_size(lo, false);
+               rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+                                              buf, XATTR_NAME_LOV, 0, th);
+               RETURN(rc);
+       }
+
+       /*
+        * Create an new composite layout with only one HSM component.
+        * Field @lhm_archive_uuid is used to be the identifier within HSM
+        * backend for the archive copy. In the PCC case with a POSIX archive,
+        * This can just be the original inode FID. This is important because
+        * the inode FID may change due to layout swaps or migration to a new
+        * MDT, and we do not want that to cause problems with finding the copy
+        * in HSM/PCC.
+        */
+       mirror_cnt = lo->ldo_mirror_count + 1;
+       if (!lo->ldo_is_composite) {
+               LASSERT(lo->ldo_mirror_count == 0);
+               mirror_cnt++;
+       }
+
+       OBD_ALLOC_PTR_ARRAY(mirror_array, mirror_cnt);
+       if (mirror_array == NULL)
+               RETURN(-ENOMEM);
+
+       new_cnt = lo->ldo_comp_cnt + 1;
+       OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt);
+       if (comp_array == NULL) {
+               OBD_FREE_PTR_ARRAY(mirror_array, mirror_cnt);
+               RETURN(-ENOMEM);
+       }
+
+       mirror_id = 0;
+       for (i = 0; i < lo->ldo_comp_cnt; i++) {
+               lod_comp = &lo->ldo_comp_entries[i];
+
+               /*
+                * Add mirror from a non-flr file, create new mirror ID.
+                * Otherwise, keep existing mirror's component ID, used
+                * for mirror extension.
+                */
+               if (lo->ldo_mirror_count == 0 &&
+                   mirror_id_of(lod_comp->llc_id) == 0)
+                       lod_comp->llc_id = pflr_id(1, i + 1);
+
+               if (lod_comp->llc_id != LCME_ID_INVAL &&
+                   mirror_id_of(lod_comp->llc_id) > mirror_id)
+                       mirror_id = mirror_id_of(lod_comp->llc_id);
+
+               if (!lo->ldo_is_composite) {
+                       lod_comp->llc_extent.e_start = 0;
+                       lod_comp->llc_extent.e_end = LUSTRE_EOF;
+                       lod_comp_set_init(lod_comp);
+               }
+       }
+
+       memcpy(comp_array, lo->ldo_comp_entries,
+              sizeof(*comp_array) * lo->ldo_comp_cnt);
+
+       lod_comp = &comp_array[new_cnt - 1];
+       lod_comp->llc_magic = LOV_MAGIC_FOREIGN;
+       lod_comp->llc_extent.e_start = 0;
+       lod_comp->llc_extent.e_end = LUSTRE_EOF;
+       lod_comp->llc_length = sizeof(struct lov_hsm_base);
+       lod_comp->llc_type = LU_FOREIGN_TYPE_PCCRO;
+       lod_comp->llc_foreign_flags = HS_EXISTS | HS_ARCHIVED | HS_PCCRO;
+       memset(&lod_comp->llc_hsm, 0, sizeof(lod_comp->llc_hsm));
+
+       if (lo->ldo_mirrors)
+               OBD_FREE_PTR_ARRAY(lo->ldo_mirrors, lo->ldo_mirror_count);
+       OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt);
+
+       /*
+        * The @ldo_mirror will be refilled by lod_fill_mirrors() when
+        * call lod_striped_create() for layout change.
+        */
+       lo->ldo_mirrors = mirror_array;
+       lo->ldo_mirror_count = mirror_cnt;
+       lo->ldo_comp_entries = comp_array;
+       lo->ldo_comp_cnt = new_cnt;
+       lo->ldo_is_composite = 1;
+
+       ++mirror_id;
+       lod_comp->llc_id = LCME_ID_INVAL;
+       lod_comp->llc_id = lod_gen_component_id(lo, mirror_id, new_cnt - 1);
+
+       if (lo->ldo_flr_state == LCM_FL_NONE)
+               lo->ldo_flr_state = LCM_FL_RDONLY;
+       lo->ldo_flr_state |= LCM_FL_PCC_RDONLY;
+       buf->lb_len = lod_comp_md_size(lo, false);
+       rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+                                      buf, XATTR_NAME_LOV, 0, th);
+       if (rc)
+               lod_striping_free(env, lo);
+
+       RETURN(rc);
+}
+
+/*
+ * TODO: When clear LCM_FL_PCC_RDONLY flag from the layouts, it means the file
+ * is going to be modified. Currently it needs two RPCs: first one is to clear
+ * LCM_FL_PCC_RDONLY flag; the second one is to pick primary mirror and mark
+ * the file as LCM_FL_WRITE_PENDING.
+ * These two RPCs can be combined in one RPC call.
+ */
+static int lod_declare_pccro_clear(const struct lu_env *env,
+                                  struct dt_object *dt, struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_layout_component *lod_comp;
+       int rc;
+
+       ENTRY;
+
+       rc = lod_striping_load(env, lo);
+       if (rc)
+               RETURN(rc);
+
+       if (!(lo->ldo_flr_state & LCM_FL_PCC_RDONLY))
+               RETURN(-EALREADY);
+
+       rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+       if (rc)
+               RETURN(rc);
+
+       lod_comp = lod_locate_comp_hsm(lo, NULL);
+       if (lod_comp == NULL) {
+               CDEBUG(D_LAYOUT, "Not found any HSM component\n");
+               GOTO(out, rc = -EINVAL);
+       }
+
+       lod_comp->llc_foreign_flags &= ~HS_PCCRO;
+       lo->ldo_flr_state &= ~LCM_FL_PCC_RDONLY;
+       lod_obj_inc_layout_gen(lo);
+       info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+       rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+                                      &info->lti_buf, XATTR_NAME_LOV, 0, th);
+out:
+       if (rc)
+               lod_striping_free(env, lo);
+
+       RETURN(rc);
+}
+
+static int lod_declare_update_pccro(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   struct md_layout_change *mlc,
+                                   struct thandle *th)
+{
+       struct layout_intent *intent = mlc->mlc_intent;
+       int rc;
+
+       switch (intent->li_opc) {
+       case LAYOUT_INTENT_PCCRO_SET:
+               rc = lod_declare_pccro_set(env, dt, th);
+               break;
+       case LAYOUT_INTENT_PCCRO_CLEAR:
+               rc = lod_declare_pccro_clear(env, dt, th);
+               break;
+       default:
+               rc = -EOPNOTSUPP;
+               break;
+       }
+
+       return rc;
+}
+
 static int lod_declare_update_rdonly(const struct lu_env *env,
                struct lod_object *lo, struct md_layout_change *mlc,
                struct thandle *th)
@@ -8034,6 +8305,8 @@ static int lod_declare_update_write_pending(const struct lu_env *env,
                        continue;
                if (lo->ldo_mirrors[i].lme_prefer == 0)
                        continue;
+               if (lo->ldo_mirrors[i].lme_hsm)
+                       continue;
 
                primary = i;
                break;
@@ -8888,6 +9161,19 @@ static int lod_declare_layout_change(const struct lu_env *env,
            dt_object_remote(dt_object_child(dt)))
                RETURN(-EINVAL);
 
+       if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+               struct layout_intent *intent = mlc->mlc_intent;
+
+               if (intent->li_opc == LAYOUT_INTENT_PCCRO_SET ||
+                   intent->li_opc == LAYOUT_INTENT_PCCRO_CLEAR) {
+                       if (!S_ISREG(dt->do_lu.lo_header->loh_attr))
+                               RETURN(-EINVAL);
+
+                       rc = lod_declare_update_pccro(env, dt, mlc, th);
+                       RETURN(rc);
+               }
+       }
+
        rc = lod_striping_load(env, lo);
        if (rc)
                GOTO(out, rc);
@@ -9170,6 +9456,9 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo)
                        /* free lod_layout_component::llc_stripe array */
                        lod_comp = &lo->ldo_comp_entries[i];
 
+                       /* HSM layout component */
+                       if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN)
+                               continue;
                        if (lod_comp->llc_stripe == NULL)
                                continue;
                        LASSERT(lod_comp->llc_stripes_allocated != 0);
index 03cb117..69190ba 100644 (file)
@@ -2128,7 +2128,7 @@ int lod_use_defined_striping(const struct lu_env *env,
                        GOTO(out, rc = -EINVAL);
                }
                foreign = (struct lov_foreign_md *)buf->lb_buf;
-               length = foreign_size_le(foreign);
+               length = lov_foreign_size_le(foreign);
                if (buf->lb_len < length) {
                        CDEBUG(D_LAYOUT,
                               "buf len %zu < this lov_foreign_md size (%zu)\n",
@@ -2179,6 +2179,16 @@ int lod_use_defined_striping(const struct lu_env *env,
                                le32_to_cpu(comp_v1->lcm_entries[i].lcme_id);
                        if (lod_comp->llc_id == LCME_ID_INVAL)
                                GOTO(out, rc = -EINVAL);
+
+                       lod_comp->llc_magic = magic;
+                       if (magic == LOV_MAGIC_FOREIGN) {
+                               rc = lod_init_comp_foreign(lod_comp, v1);
+                               if (rc)
+                                       GOTO(out, rc);
+                               continue;
+                       }
+               } else {
+                       lod_comp->llc_magic = magic;
                }
 
                pool_name = NULL;
@@ -2406,10 +2416,11 @@ int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
        case LOV_USER_MAGIC_FOREIGN:
                if (!lfm)
                        lfm = buf->lb_buf;
-               rc = lod_alloc_foreign_lov(lo, foreign_size(lfm));
+               rc = lod_alloc_foreign_lov(lo, lov_foreign_size(lfm));
                if (rc)
                        RETURN(rc);
-               memcpy(lo->ldo_foreign_lov, buf->lb_buf, foreign_size(lfm));
+               memcpy(lo->ldo_foreign_lov, buf->lb_buf,
+                      lov_foreign_size(lfm));
                RETURN(0);
        default:
                CERROR("%s: unrecognized magic %X\n",
@@ -2710,6 +2721,10 @@ int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
        lod_comp = &lo->ldo_comp_entries[comp_idx];
        LASSERT(!(lod_comp->llc_flags & LCME_FL_EXTENSION));
 
+       /* A foreign/HSM component is being created */
+       if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN)
+               RETURN(0);
+
        /* A released component is being created */
        if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
                RETURN(0);
index acf6c55..485a540 100644 (file)
@@ -620,7 +620,7 @@ lov_stripe_md *lsm_unpackmd_foreign(struct lov_obd *lov, void *buf,
        atomic_set(&lsm->lsm_refc, 1);
        spin_lock_init(&lsm->lsm_lock);
        lsm->lsm_magic = le32_to_cpu(lfm->lfm_magic);
-       lsm->lsm_foreign_size = foreign_size_le(lfm);
+       lsm->lsm_foreign_size = lov_foreign_size_le(lfm);
 
        /* alloc for full foreign EA including format fields */
        OBD_ALLOC_LARGE(lsme, lsm->lsm_foreign_size);
index ad40881..222aa76 100644 (file)
@@ -2354,7 +2354,8 @@ static int mdd_layout_swap_allowed(const struct lu_env *env,
 }
 
 /* XXX To set the proper lmm_oi & lmm_layout_gen when swap layouts, we have to
- *     look into the layout in MDD layer. */
+ *     look into the layout in MDD layer.
+ */
 static int mdd_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi, bool get)
 {
        struct lov_comp_md_v1   *comp_v1;
@@ -2370,16 +2371,36 @@ static int mdd_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi, bool get)
                        return -EINVAL;
 
                if (get) {
-                       off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
+                       int i = 0;
+
+                       off = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset);
                        v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
-                       *oi = v1->lmm_oi;
+                       if (le32_to_cpu(v1->lmm_magic) != LOV_MAGIC_FOREIGN) {
+                               *oi = v1->lmm_oi;
+                       } else {
+                               if (ent_count == 1)
+                                       return -EINVAL;
+
+                               i = 1;
+                               off = le32_to_cpu(
+                                       comp_v1->lcm_entries[i].lcme_offset);
+                               v1 = (struct lov_mds_md *)((char *)comp_v1 +
+                                                          off);
+                               if (le32_to_cpu(v1->lmm_magic) ==
+                                                       LOV_MAGIC_FOREIGN)
+                                       return -EINVAL;
+
+                               *oi = v1->lmm_oi;
+                       }
                } else {
                        for (i = 0; i < le32_to_cpu(ent_count); i++) {
                                off = le32_to_cpu(comp_v1->lcm_entries[i].
                                                lcme_offset);
                                v1 = (struct lov_mds_md *)((char *)comp_v1 +
                                                off);
-                               v1->lmm_oi = *oi;
+                               if (le32_to_cpu(v1->lmm_magic) !=
+                                                       LOV_MAGIC_FOREIGN)
+                                       v1->lmm_oi = *oi;
                        }
                }
        } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
@@ -3180,10 +3201,55 @@ out:
 }
 
 /**
+ *  Update the layout for PCC-RO.
+ */
+static int
+mdd_layout_update_pccro(const struct lu_env *env, struct md_object *o,
+                       struct md_layout_change *mlc)
+{
+       struct mdd_object *obj = md2mdd_obj(o);
+       struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
+       struct thandle *handle;
+       int rc;
+
+       ENTRY;
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       /* TODO: Set SOM strict correct when the file is PCC-RO cached. */
+       rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
+       /**
+        * It is possible that another layout write intent has already
+        * set/cleared read-only flag on the object, so as to return
+        * -EALREADY, and we need to do nothing in this case.
+        */
+       if (rc)
+               GOTO(out, rc == -EALREADY ? rc = 0 : rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       mdd_write_lock(env, obj, DT_TGT_CHILD);
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       mdd_write_unlock(env, obj);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
+                                     NULL);
+out:
+       mdd_trans_stop(env, mdd, rc, handle);
+
+       RETURN(rc);
+}
+/**
  * Layout change callback for object.
  *
- * This is only used by FLR for now. In the future, it can be exteneded to
- * handle all layout change.
+ * This is only used by FLR and PCC-RO for now. In the future, it can be
+ * exteneded to handle all layout change.
  */
 static int
 mdd_layout_change(const struct lu_env *env, struct md_object *o,
@@ -3216,7 +3282,13 @@ mdd_layout_change(const struct lu_env *env, struct md_object *o,
 
        /* Verify acceptable operations */
        switch (mlc->mlc_opc) {
-       case MD_LAYOUT_WRITE:
+       case MD_LAYOUT_WRITE: {
+               struct layout_intent *intent = mlc->mlc_intent;
+
+               if (intent->li_opc == LAYOUT_INTENT_PCCRO_SET ||
+                   intent->li_opc == LAYOUT_INTENT_PCCRO_CLEAR)
+                       RETURN(mdd_layout_update_pccro(env, o, mlc));
+       }
        case MD_LAYOUT_RESYNC:
        case MD_LAYOUT_RESYNC_DONE:
                break;
index 20cea90..2027012 100644 (file)
@@ -4807,6 +4807,8 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
        switch (intent->li_opc) {
        case LAYOUT_INTENT_TRUNC:
        case LAYOUT_INTENT_WRITE:
+       case LAYOUT_INTENT_PCCRO_SET:
+       case LAYOUT_INTENT_PCCRO_CLEAR:
                layout.mlc_opc = MD_LAYOUT_WRITE;
                layout.mlc_intent = intent;
                break;
index c7f9f22..cd67d54 100644 (file)
@@ -1835,6 +1835,9 @@ static inline int mdt_hsm_set_released(struct lov_mds_md *lmm)
                for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
                        off = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset);
                        v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+                       if (v1->lmm_magic == cpu_to_le32(LOV_MAGIC_FOREIGN))
+                               continue;
+
                        v1->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED);
                }
        } else {
index b9c77c6..6a18c5e 100644 (file)
@@ -2398,6 +2398,16 @@ static void lustre_print_v1v3(unsigned int lvl, struct lov_user_md *lum,
        }
 }
 
+static void lustre_print_foreign(unsigned int lvl, struct lov_foreign_md *lfm,
+                                const char *msg)
+{
+       CDEBUG(lvl, "%s lov_foreign_md %p:\n", msg, lfm);
+       CDEBUG(lvl, "\tlfm_magic: %#X\n", lfm->lfm_magic);
+       CDEBUG(lvl, "\tlfm_length: %u\n", lfm->lfm_length);
+       CDEBUG(lvl, "\tlfm_type: %#X\n", lfm->lfm_type);
+       CDEBUG(lvl, "\tlfm_flags: %#X\n", lfm->lfm_flags);
+}
+
 void lustre_print_user_md(unsigned int lvl, struct lov_user_md *lum,
                          const char *msg)
 {
@@ -2446,7 +2456,11 @@ void lustre_print_user_md(unsigned int lvl, struct lov_user_md *lum,
 
                v1 = (struct lov_user_md *)((char *)comp_v1 +
                                comp_v1->lcm_entries[i].lcme_offset);
-               lustre_print_v1v3(lvl, v1, msg);
+               if (v1->lmm_magic == LOV_MAGIC_FOREIGN)
+                       lustre_print_foreign(lvl, (struct lov_foreign_md *)v1,
+                                            msg);
+               else
+                       lustre_print_v1v3(lvl, v1, msg);
        }
 }
 EXPORT_SYMBOL(lustre_print_user_md);
@@ -2488,6 +2502,22 @@ void lustre_swab_lov_user_md_v3(struct lov_user_md_v3 *lum)
 }
 EXPORT_SYMBOL(lustre_swab_lov_user_md_v3);
 
+static void lustre_swab_lov_hsm_md(struct lov_hsm_md *lhm)
+{
+       ENTRY;
+       CDEBUG(D_IOCTL, "swabbing lov_hsm_md\n");
+       __swab32s(&lhm->lhm_magic);
+       __swab32s(&lhm->lhm_length);
+       __swab32s(&lhm->lhm_type);
+       __swab32s(&lhm->lhm_flags);
+
+       if (lov_hsm_type_supported(lhm->lhm_type)) {
+               __swab64s(&lhm->lhm_archive_id);
+               __swab64s(&lhm->lhm_archive_ver);
+       }
+       EXIT;
+}
+
 void lustre_swab_lov_comp_md_v1(struct lov_comp_md_v1 *lum)
 {
        struct lov_comp_md_entry_v1     *ent;
@@ -2537,6 +2567,12 @@ void lustre_swab_lov_comp_md_v1(struct lov_comp_md_v1 *lum)
                /* no need to swab lcme_cstripe_count */
 
                v1 = (struct lov_user_md_v1 *)((char *)lum + off);
+               if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_FOREIGN) ||
+                   v1->lmm_magic == LOV_USER_MAGIC_FOREIGN) {
+                       lustre_swab_lov_hsm_md((struct lov_hsm_md *)v1);
+                       return;
+               }
+
                stripe_count = v1->lmm_stripe_count;
                if (!cpu_endian)
                        __swab16s(&stripe_count);
index 0b9a7e9..bf10d28 100644 (file)
@@ -886,6 +886,30 @@ check_lov_foreign_md(void)
 }
 
 static void
+check_lov_hsm_base(void)
+{
+       BLANK_LINKE();
+       CHECK_STRUCT(lov_hsm_base);
+       CHECK_MEMBER(lov_hsm_base, lhb_archive_id);
+       CHECK_MEMBER(lov_hsm_base, lhb_archive_ver);
+       CHECK_MEMBER(lov_hsm_base, lhb_uuid[0]);
+}
+
+static void
+check_lov_hsm_md(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(lov_hsm_md);
+       CHECK_MEMBER(lov_hsm_md, lhm_magic);
+       CHECK_MEMBER(lov_hsm_md, lhm_length);
+       CHECK_MEMBER(lov_hsm_md, lhm_type);
+       CHECK_MEMBER(lov_hsm_md, lhm_flags);
+       CHECK_MEMBER(lov_hsm_md, lhm_archive_id);
+       CHECK_MEMBER(lov_hsm_md, lhm_archive_ver);
+       CHECK_MEMBER(lov_hsm_md, lhm_archive_uuid[0]);
+}
+
+static void
 check_lov_comp_md_entry_v1(void)
 {
        BLANK_LINE();
@@ -3478,6 +3502,8 @@ main(int argc, char **argv)
        check_lov_mds_md_v1();
        check_lov_mds_md_v3();
        check_lov_foreign_md();
+       check_lov_hsm_base();
+       check_lov_hsm_md();
        check_lov_comp_md_entry_v1();
        check_lov_comp_md_v1();
        check_lmv_mds_md_v1();