Whamcloud - gitweb
LU-9771 flr: Send write intent RPC to mdt 91/29091/14
authorJinshan Xiong <jinshan.xiong@intel.com>
Mon, 13 Nov 2017 23:34:14 +0000 (23:34 +0000)
committerJinshan Xiong <jinshan.xiong@intel.com>
Mon, 20 Nov 2017 19:18:05 +0000 (19:18 +0000)
When a mirrored file is going to be written, the client needs
to send a write intent RPC to the MDT. The MDT will pick a mirror
as primary and mark the others as stale. The new md operation
moo_layout_change() is introduced for this purpose. The MDT also
transfers the latest layout version to the OST objects via
do_attr_set().

Once OSTs receive the setattr RPC for layout version change, it
will set the update layout version into extended attribute
XATTR_NAME_FID.

Test-Parameters: testlist=sanity-flr
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: Ib0049a78b95895141b0032e8eff526a73a160dcb
Reviewed-on: https://review.whamcloud.com/29091
Tested-by: Jenkins
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
33 files changed:
lustre/include/cl_object.h
lustre/include/lu_object.h
lustre/include/lustre_fid.h
lustre/include/lustre_osc.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/llite/vvp_object.c
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/lod/lod_object.c
lustre/lod/lod_qos.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_object.c
lustre/lov/lov_page.c
lustre/mdd/mdd_object.c
lustre/obdclass/llog_swab.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_io.c
lustre/ofd/ofd_obd.c
lustre/ofd/ofd_objects.c
lustre/osc/osc_cache.c
lustre/osc/osc_io.c
lustre/osc/osc_request.c
lustre/osp/osp_object.c
lustre/osp/osp_sync.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/wiretest.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index d8ed065..61338cd 100644 (file)
@@ -1796,6 +1796,8 @@ struct cl_io {
         struct cl_lockset              ci_lockset;
         /** lock requirements, this is just a help info for sublayers. */
         enum cl_io_lock_dmd            ci_lockreq;
+       /** layout version when this IO occurs */
+       __u32                           ci_layout_version;
         union {
                struct cl_rw_io {
                        struct iov_iter          rw_iter;
@@ -1871,8 +1873,10 @@ struct cl_io {
         */
                             ci_ignore_layout:1,
        /**
-        * Need MDS intervention to complete a write. This usually means the
-        * corresponding component is not initialized for the writing extent.
+        * Need MDS intervention to complete a write.
+        * Write intent is required for the following cases:
+        * 1. component being written is not initialized, or
+        * 2. the mirrored files are NOT in WRITE_PENDING state.
         */
                             ci_need_write_intent:1,
        /**
index 50bad11..e101c7f 100644 (file)
@@ -426,6 +426,8 @@ struct lu_attr {
         __u32          la_rdev;
        /** project id */
        __u32          la_projid;
+       /** set layout version to OST objects. */
+       __u32           la_layout_version;
 };
 
 /** Bit-mask of valid attributes */
@@ -446,6 +448,11 @@ enum la_valid {
         LA_KILL_SUID = 1 << 13,
         LA_KILL_SGID = 1 << 14,
        LA_PROJID    = 1 << 15,
+       LA_LAYOUT_VERSION = 1 << 16,
+       /**
+        * Attributes must be transmitted to OST objects
+        */
+       LA_REMOTE_ATTR_SET = (LA_UID | LA_GID | LA_PROJID | LA_LAYOUT_VERSION)
 };
 
 /**
index 402cbef..e34ac94 100644 (file)
@@ -351,10 +351,13 @@ static inline void filter_fid_cpu_to_le(struct filter_fid *dst,
 {
        fid_cpu_to_le(&dst->ff_parent, &src->ff_parent);
 
-       if (size < sizeof(struct filter_fid))
+       if (size < sizeof(struct filter_fid)) {
                memset(&dst->ff_layout, 0, sizeof(dst->ff_layout));
-       else
+       } else {
                ost_layout_cpu_to_le(&dst->ff_layout, &src->ff_layout);
+               dst->ff_layout_version = cpu_to_le32(src->ff_layout_version);
+               dst->ff_range = cpu_to_le32(src->ff_range);
+       }
 
        /* XXX: Add more if filter_fid is enlarged in the future. */
 }
@@ -364,10 +367,13 @@ static inline void filter_fid_le_to_cpu(struct filter_fid *dst,
 {
        fid_le_to_cpu(&dst->ff_parent, &src->ff_parent);
 
-       if (size < sizeof(struct filter_fid))
+       if (size < sizeof(struct filter_fid)) {
                memset(&dst->ff_layout, 0, sizeof(dst->ff_layout));
-       else
+       } else {
                ost_layout_le_to_cpu(&dst->ff_layout, &src->ff_layout);
+               dst->ff_layout_version = le32_to_cpu(src->ff_layout_version);
+               dst->ff_range = le32_to_cpu(src->ff_range);
+       }
 
        /* XXX: Add more if filter_fid is enlarged in the future. */
 }
index 0bb766c..734566d 100644 (file)
@@ -594,8 +594,9 @@ int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj,
                            struct osc_page *ops);
 int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
                         struct osc_page *ops);
-int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
-                        struct list_head *list, int brw_flags);
+int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+                        struct osc_object *obj, struct list_head *list,
+                        int brw_flags);
 int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
                             __u64 size, struct osc_extent **extp);
 void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext);
@@ -965,6 +966,8 @@ struct osc_extent {
        int                     oe_rc;
        /** max pages per rpc when this extent was created */
        unsigned int            oe_mppr;
+       /** FLR: layout version when this osc_extent is publised */
+       __u32                   oe_layout_version;
 };
 
 /** @} osc */
index 9497830..528b7d0 100644 (file)
@@ -771,9 +771,10 @@ static inline int it_to_lock_mode(struct lookup_intent *it)
        /* CREAT needs to be tested before open (both could be set) */
        if (it->it_op & IT_CREAT)
                return LCK_CW;
-       else if (it->it_op & (IT_GETATTR | IT_OPEN | IT_LOOKUP |
-                             IT_LAYOUT))
+       else if (it->it_op & (IT_GETATTR | IT_OPEN | IT_LOOKUP))
                return LCK_CR;
+       else if (it->it_op & IT_LAYOUT)
+               return (it->it_flags & FMODE_WRITE) ? LCK_EX : LCK_CR;
        else if (it->it_op &  IT_READDIR)
                return LCK_PR;
        else if (it->it_op &  IT_GETXATTR)
index ee48781..1158ec5 100644 (file)
@@ -610,6 +610,8 @@ extern char obd_jobid_var[];
 
 /* FLR */
 #define OBD_FAIL_FLR_GLIMPSE_IMMUTABLE         0x1A00
+#define OBD_FAIL_FLR_LV_DELAY                  0x1A01
+#define OBD_FAIL_FLR_LV_INC                    0x1A02
 
 /* DT */
 #define OBD_FAIL_DT_DECLARE_ATTR_GET           0x2000
index d8aa186..23b13bd 100644 (file)
@@ -1174,7 +1174,8 @@ lov_mds_md_max_stripe_count(size_t buf_size, __u32 lmm_magic)
 #define OBD_MD_DOM_SIZE    (0X00001000ULL) /* Data-on-MDT component size */
 #define OBD_MD_FLNLINK     (0x00002000ULL) /* link count */
 #define OBD_MD_FLGENER     (0x00004000ULL) /* generation number */
-/*#define OBD_MD_FLINLINE    (0x00008000ULL)  inline data. used until 1.6.5 */
+#define OBD_MD_LAYOUT_VERSION (0x00008000ULL) /* layout version for
+                                              * OST objects */
 #define OBD_MD_FLRDEV      (0x00010000ULL) /* device number */
 #define OBD_MD_FLEASIZE    (0x00020000ULL) /* extended attribute data */
 #define OBD_MD_LINKNAME    (0x00040000ULL) /* symbolic link target */
@@ -2683,7 +2684,7 @@ struct llog_setattr64_rec_v2 {
        __u32                   lsr_gid_h;
        __u64                   lsr_valid;
        __u32                   lsr_projid;
-       __u32                   lsr_padding1;
+       __u32                   lsr_layout_version;
        __u64                   lsr_padding2;
        __u64                   lsr_padding3;
        struct llog_rec_tail    lsr_tail;
@@ -2922,7 +2923,7 @@ struct obdo {
         *
         * sizeof(ost_layout) + sieof(__u32) == sizeof(llog_cookie). */
        struct ost_layout       o_layout;
-       __u32                   o_padding_3;
+       __u32                   o_layout_version;
        __u32                   o_uid_h;
        __u32                   o_gid_h;
 
index c400d20..c538657 100644 (file)
@@ -193,6 +193,9 @@ struct filter_fid_old {
 struct filter_fid {
        struct lu_fid           ff_parent;
        struct ost_layout       ff_layout;
+       __u32                   ff_layout_version;
+       __u32                   ff_range; /* range of layout version that
+                                          * write are allowed */
 } __attribute__((packed));
 
 /* Userspace should treat lu_fid as opaque, and only use the following methods
@@ -550,6 +553,10 @@ enum lov_comp_md_entry_flags {
 
 #define LCME_KNOWN_FLAGS       (LCME_FL_NEG | LCME_FL_INIT)
 
+/* the highest bit in obdo::o_layout_version is used to mark if the file is
+ * being resynced. */
+#define LU_LAYOUT_RESYNC       LCME_FL_NEG
+
 /* lcme_id can be specified as certain flags, and the the first
  * bit of lcme_id is used to indicate that the ID is representing
  * certain LCME_FL_* but not a real ID. Which implies we can have
@@ -945,6 +952,8 @@ enum changelog_rec_type {
        CL_CTIME    = 18,
        CL_ATIME    = 19,
        CL_MIGRATE  = 20,
+       CL_FLRW     = 21, /* FLR: file was firstly written */
+       CL_RESYNC   = 22, /* FLR: file was resync-ed */
        CL_LAST
 };
 
@@ -952,7 +961,8 @@ static inline const char *changelog_type2str(int type) {
        static const char *changelog_str[] = {
                "MARK",  "CREAT", "MKDIR", "HLINK", "SLINK", "MKNOD", "UNLNK",
                "RMDIR", "RENME", "RNMTO", "OPEN",  "CLOSE", "LYOUT", "TRUNC",
-               "SATTR", "XATTR", "HSM",   "MTIME", "CTIME", "ATIME", "MIGRT"
+               "SATTR", "XATTR", "HSM",   "MTIME", "CTIME", "ATIME", "MIGRT",
+               "FLRW",  "RESYNC",
        };
 
        if (type >= 0 && type < CL_LAST)
index 8904e45..6ca4212 100644 (file)
@@ -169,6 +169,13 @@ static int vvp_prune(const struct lu_env *env, struct cl_object *obj)
        }
 
        truncate_inode_pages(inode->i_mapping, 0);
+       if (inode->i_mapping->nrpages) {
+               CDEBUG(D_VFSTRACE, DFID ": still has %lu pages remaining\n",
+                      PFID(lu_object_fid(&obj->co_lu)),
+                      inode->i_mapping->nrpages);
+               RETURN(-EIO);
+       }
+
        RETURN(0);
 }
 
index 5f80430..94d4a33 100644 (file)
@@ -1865,6 +1865,10 @@ static void lod_key_fini(const struct lu_context *ctx,
        if (inuse->op_size)
                OBD_FREE(inuse->op_array, inuse->op_size);
 
+       if (info->lti_comp_size > 0)
+               OBD_FREE(info->lti_comp_idx,
+                        info->lti_comp_size * sizeof(__u32));
+
        OBD_FREE_PTR(info);
 }
 
index b475827..af6b736 100644 (file)
@@ -408,6 +408,10 @@ struct lod_thread_info {
        /* used to store parent default striping in create */
        struct lod_default_striping     lti_def_striping;
        struct filter_fid lti_ff;
+       __u32                           *lti_comp_idx;
+       size_t                          lti_comp_size;
+       size_t                          lti_count;
+       struct lu_attr                  lti_layout_attr;
 };
 
 extern const struct lu_device_operations lod_lu_ops;
@@ -450,6 +454,11 @@ static inline struct lu_object *lod2lu_obj(struct lod_object *obj)
        return &obj->ldo_obj.do_lu;
 }
 
+static inline const struct lu_fid *lod_object_fid(struct lod_object *obj)
+{
+       return lu_object_fid(lod2lu_obj(obj));
+}
+
 static inline struct lod_object *lod_obj(const struct lu_object *o)
 {
        LASSERT(lu_device_is_lod(o->lo_dev));
@@ -637,18 +646,25 @@ int lod_pool_new(struct obd_device *obd, char *poolname);
 int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname);
 int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname);
 
+struct lod_obj_stripe_cb_data;
+typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env,
+                                  struct lod_object *lo, struct dt_object *dt,
+                                  struct thandle *th,
+                                  int comp_idx, int stripe_idx,
+                                  struct lod_obj_stripe_cb_data *data);
+typedef bool (*lod_obj_comp_skip_cb_t)(const struct lu_env *env,
+                                       struct lod_object *lo, int comp_idx,
+                                       struct lod_obj_stripe_cb_data *data);
 struct lod_obj_stripe_cb_data {
        union {
                const struct lu_attr    *locd_attr;
                struct ost_pool         *locd_inuse;
        };
-       bool    locd_declare;
+       lod_obj_stripe_cb_t             locd_stripe_cb;
+       lod_obj_comp_skip_cb_t          locd_comp_skip_cb;
+       bool                            locd_declare;
 };
 
-typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env,
-                                  struct lod_object *lo, struct dt_object *dt,
-                                  struct thandle *th, int stripe_idx,
-                                  struct lod_obj_stripe_cb_data *data);
 /* lod_qos.c */
 int lod_prepare_inuse(const struct lu_env *env, struct lod_object *lo);
 int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
@@ -661,7 +677,7 @@ int lod_use_defined_striping(const struct lu_env *, struct lod_object *,
                             const struct lu_buf *);
 int lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo,
                                struct dt_object *dt, struct thandle *th,
-                               int stripe_idx,
+                               int comp_idx, int stripe_idx,
                                struct lod_obj_stripe_cb_data *data);
 int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
                         const struct lu_buf *buf);
@@ -693,7 +709,7 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt,
 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo);
 
 int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
-                           struct thandle *th, lod_obj_stripe_cb_t cb,
+                           struct thandle *th,
                            struct lod_obj_stripe_cb_data *data);
 
 /* lod_sub_object.c */
index 5cde03d..b0e940f 100644 (file)
@@ -1047,7 +1047,7 @@ static int lod_attr_get(const struct lu_env *env,
 }
 
 int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
-                           struct thandle *th, lod_obj_stripe_cb_t cb,
+                           struct thandle *th,
                            struct lod_obj_stripe_cb_data *data)
 {
        struct lod_layout_component *lod_comp;
@@ -1061,13 +1061,23 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
                if (lod_comp->llc_stripe == NULL)
                        continue;
 
+               /* has stripe but not inited yet, this component has been
+                * declared to be created, but hasn't created yet.
+                */
+               if (!lod_comp_inited(lod_comp))
+                       continue;
+
+               if (data->locd_comp_skip_cb &&
+                   data->locd_comp_skip_cb(env, lo, i, data))
+                       continue;
+
                LASSERT(lod_comp->llc_stripe_count > 0);
                for (j = 0; j < lod_comp->llc_stripe_count; j++) {
                        struct dt_object *dt = lod_comp->llc_stripe[j];
 
                        if (dt == NULL)
                                continue;
-                       rc = cb(env, lo, dt, th, j, data);
+                       rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data);
                        if (rc != 0)
                                RETURN(rc);
                }
@@ -1075,10 +1085,63 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
        RETURN(0);
 }
 
+static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env,
+               struct lod_object *lo, int comp_idx,
+               struct lod_obj_stripe_cb_data *data)
+{
+       struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx];
+       bool skipped = false;
+
+       if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION))
+               return skipped;
+
+       switch (lo->ldo_flr_state) {
+       case LCM_FL_WRITE_PENDING: {
+               int i;
+
+               /* skip stale components */
+               if (lod_comp->llc_flags & LCME_FL_STALE) {
+                       skipped = true;
+                       break;
+               }
+
+               /* skip valid and overlapping components, therefore any
+                * attempts to write overlapped components will never succeed
+                * because client will get EINPROGRESS. */
+               for (i = 0; i < lo->ldo_comp_cnt; i++) {
+                       if (i == comp_idx)
+                               continue;
+
+                       if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE)
+                               continue;
+
+                       if (lu_extent_is_overlapped(&lod_comp->llc_extent,
+                                       &lo->ldo_comp_entries[i].llc_extent)) {
+                               skipped = true;
+                               break;
+                       }
+               }
+               break;
+       }
+       default:
+               LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state);
+       case LCM_FL_SYNC_PENDING:
+               break;
+       }
+
+       CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n",
+              PFID(lu_object_fid(&lo->ldo_obj.do_lu)),
+              skipped ? "skipped" : "chose", lod_comp->llc_id,
+              data->locd_attr->la_layout_version);
+
+       return skipped;
+}
+
 static inline int
 lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo,
                           struct dt_object *dt, struct thandle *th,
-                          int stripe_idx, struct lod_obj_stripe_cb_data *data)
+                          int comp_idx, int stripe_idx,
+                          struct lod_obj_stripe_cb_data *data)
 {
        if (data->locd_declare)
                return lod_sub_declare_attr_set(env, dt, data->locd_attr, th);
@@ -1120,7 +1183,7 @@ static int lod_declare_attr_set(const struct lu_env *env,
         * speed up rename().
         */
        if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
-               if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+               if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
                        RETURN(rc);
 
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
@@ -1157,12 +1220,12 @@ static int lod_declare_attr_set(const struct lu_env *env,
                                RETURN(rc);
                }
        } else {
-               struct lod_obj_stripe_cb_data data;
+               struct lod_obj_stripe_cb_data data = { { 0 } };
 
                data.locd_attr = attr;
                data.locd_declare = true;
-               rc = lod_obj_for_each_stripe(env, lo, th,
-                               lod_obj_stripe_attr_set_cb, &data);
+               data.locd_stripe_cb = lod_obj_stripe_attr_set_cb;
+               rc = lod_obj_for_each_stripe(env, lo, th, &data);
        }
 
        if (rc)
@@ -1217,7 +1280,7 @@ static int lod_attr_set(const struct lu_env *env,
                RETURN(rc);
 
        if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
-               if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+               if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
                        RETURN(rc);
 
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
@@ -1229,6 +1292,14 @@ static int lod_attr_set(const struct lu_env *env,
                        RETURN(rc);
        }
 
+       /* FIXME: a tricky case in the code path of mdd_layout_change():
+        * the in-memory striping information has been freed in lod_xattr_set()
+        * due to layout change. It has to load stripe here again. It only
+        * changes flags of layout so declare_attr_set() is still accurate */
+       rc = lod_load_striping_locked(env, lo);
+       if (rc)
+               RETURN(rc);
+
        if (!lod_obj_is_striped(dt))
                RETURN(0);
 
@@ -1249,12 +1320,13 @@ static int lod_attr_set(const struct lu_env *env,
                                break;
                }
        } else {
-               struct lod_obj_stripe_cb_data data;
+               struct lod_obj_stripe_cb_data data = { { 0 } };
 
                data.locd_attr = attr;
                data.locd_declare = false;
-               rc = lod_obj_for_each_stripe(env, lo, th,
-                               lod_obj_stripe_attr_set_cb, &data);
+               data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb;
+               data.locd_stripe_cb = lod_obj_stripe_attr_set_cb;
+               rc = lod_obj_for_each_stripe(env, lo, th, &data);
        }
 
        if (rc)
@@ -2057,7 +2129,7 @@ static int
 lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env,
                                     struct lod_object *lo,
                                     struct dt_object *dt, struct thandle *th,
-                                    int stripe_idx,
+                                    int comp_idx, int stripe_idx,
                                     struct lod_obj_stripe_cb_data *data)
 {
        struct lod_thread_info *info = lod_env_info(env);
@@ -2110,7 +2182,7 @@ static int lod_replace_parent_fid(const struct lu_env *env,
        struct lod_thread_info  *info = lod_env_info(env);
        struct lu_buf *buf = &info->lti_buf;
        struct filter_fid *ff;
-       struct lod_obj_stripe_cb_data data;
+       struct lod_obj_stripe_cb_data data = { { 0 } };
        int rc;
        ENTRY;
 
@@ -2134,9 +2206,8 @@ static int lod_replace_parent_fid(const struct lu_env *env,
        buf->lb_len = info->lti_ea_store_size;
 
        data.locd_declare = declare;
-       rc = lod_obj_for_each_stripe(env, lo, th,
-                                    lod_obj_stripe_replace_parent_fid_cb,
-                                    &data);
+       data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb;
+       rc = lod_obj_for_each_stripe(env, lo, th, &data);
 
        RETURN(rc);
 }
@@ -4611,7 +4682,8 @@ static int lod_create(const struct lu_env *env, struct dt_object *dt,
 static inline int
 lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo,
                          struct dt_object *dt, struct thandle *th,
-                         int stripe_idx, struct lod_obj_stripe_cb_data *data)
+                         int comp_idx, int stripe_idx,
+                         struct lod_obj_stripe_cb_data *data)
 {
        if (data->locd_declare)
                return lod_sub_declare_destroy(env, dt, th);
@@ -4703,11 +4775,11 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt,
                                break;
                }
        } else {
-               struct lod_obj_stripe_cb_data data;
+               struct lod_obj_stripe_cb_data data = { { 0 } };
 
                data.locd_declare = true;
-               rc = lod_obj_for_each_stripe(env, lo, th,
-                               lod_obj_stripe_destroy_cb, &data);
+               data.locd_stripe_cb = lod_obj_stripe_destroy_cb;
+               rc = lod_obj_for_each_stripe(env, lo, th, &data);
        }
 
        RETURN(rc);
@@ -4793,11 +4865,11 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt,
                        }
                }
        } else {
-               struct lod_obj_stripe_cb_data data;
+               struct lod_obj_stripe_cb_data data = { { 0 } };
 
                data.locd_declare = false;
-               rc = lod_obj_for_each_stripe(env, lo, th,
-                               lod_obj_stripe_destroy_cb, &data);
+               data.locd_stripe_cb = lod_obj_stripe_destroy_cb;
+               rc = lod_obj_for_each_stripe(env, lo, th, &data);
        }
 
        RETURN(rc);
@@ -5055,30 +5127,78 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt)
        return dt_invalidate(env, dt_object_child(dt));
 }
 
-static int lod_declare_layout_change(const struct lu_env *env,
-                                    struct dt_object *dt,
-                                    struct md_layout_change *mlc,
-                                    struct thandle *th)
+static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt)
 {
-       struct lod_thread_info  *info = lod_env_info(env);
-       struct lod_object *lo = lod_dt_obj(dt);
-       struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
-       struct dt_object *next = dt_object_child(dt);
+       ENTRY;
+
+       /* clear memory region that will be used for layout change */
+       memset(&info->lti_layout_attr, 0, sizeof(struct lu_attr));
+       info->lti_count = 0;
+
+       if (info->lti_comp_size >= comp_cnt)
+               RETURN(0);
+
+       if (info->lti_comp_size > 0) {
+               OBD_FREE(info->lti_comp_idx,
+                        info->lti_comp_size * sizeof(__u32));
+               info->lti_comp_size = 0;
+       }
+
+       OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32));
+       if (!info->lti_comp_idx)
+               RETURN(-ENOMEM);
+
+       info->lti_comp_size = comp_cnt;
+       RETURN(0);
+}
+
+static int lod_declare_instantiate_components(const struct lu_env *env,
+               struct lod_object *lo, struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
        struct ost_pool *inuse = &info->lti_inuse_osts;
-       struct layout_intent *layout = mlc->mlc_intent;
-       struct lu_buf *buf = &mlc->mlc_buf;
+       int i;
+       int rc = 0;
+       ENTRY;
+
+       LASSERT(info->lti_count < lo->ldo_comp_cnt);
+       if (info->lti_count > 0) {
+               /* Prepare inuse array for composite file */
+               rc = lod_prepare_inuse(env, lo);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       for (i = 0; i < info->lti_count; i++) {
+               rc = lod_qos_prep_create(env, lo, NULL, th,
+                                        info->lti_comp_idx[i], inuse);
+               if (rc)
+                       break;
+       }
+
+       if (!rc) {
+               info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+               rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+                               &info->lti_buf, XATTR_NAME_LOV, 0, th);
+       }
+
+       RETURN(rc);
+}
+
+static int lod_declare_update_plain(const struct lu_env *env,
+               struct lod_object *lo, struct layout_intent *layout,
+               const struct lu_buf *buf, struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
        struct lod_layout_component *lod_comp;
        struct lov_comp_md_v1 *comp_v1 = NULL;
        bool replay = false;
-       bool need_create = false;
        int i, rc;
        ENTRY;
 
-       if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
-           dt_object_remote(next))
-               RETURN(-EINVAL);
+       LASSERT(lo->ldo_flr_state == LCM_FL_NOT_FLR);
 
-       dt_write_lock(env, next, 0);
        /*
         * In case the client is passing lovea, which only happens during
         * the replay of layout intent write RPC for now, we may need to
@@ -5113,11 +5233,6 @@ static int lod_declare_layout_change(const struct lu_env *env,
                rc = lod_load_striping_locked(env, lo);
                if (rc)
                        GOTO(out, rc);
-
-               /* Prepare inuse array for composite file */
-               rc = lod_prepare_inuse(env, lo);
-               if (rc)
-                       GOTO(out, rc);
        }
 
        /* Make sure defined layout covers the requested write range. */
@@ -5134,7 +5249,7 @@ static int lod_declare_layout_change(const struct lu_env *env,
        }
 
        CDEBUG(D_LAYOUT, "%s: "DFID": instantiate components "DEXT"\n",
-              lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+              lod2obd(d)->obd_name, PFID(lod_object_fid(lo)),
               PEXT(&layout->li_extent));
 
        /*
@@ -5170,30 +5285,292 @@ static int lod_declare_layout_change(const struct lu_env *env,
                if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
                        GOTO(out, rc = -EINVAL);
 
-               need_create = true;
+               LASSERT(info->lti_comp_idx != NULL);
+               info->lti_comp_idx[info->lti_count++] = i;
+       }
+
+       if (info->lti_count == 0)
+               RETURN(-EALREADY);
 
-               rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse);
-               if (rc)
+       lod_obj_inc_layout_gen(lo);
+       rc = lod_declare_instantiate_components(env, lo, th);
+out:
+       if (rc)
+               lod_object_free_striping(env, lo);
+       RETURN(rc);
+}
+
+#define lod_foreach_mirror_comp(comp, lo, mirror_idx)                      \
+for (comp = &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_start];  \
+     comp <= &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_end];   \
+     comp++)
+
+static inline int lod_comp_index(struct lod_object *lo,
+                                struct lod_layout_component *lod_comp)
+{
+       LASSERT(lod_comp >= lo->ldo_comp_entries &&
+               lod_comp <= &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]);
+
+       return lod_comp - lo->ldo_comp_entries;
+}
+
+/**
+ * Stale other mirrors by writing extent.
+ */
+static void lod_stale_components(struct lod_object *lo, int primary,
+                                struct lu_extent *extent)
+{
+       struct lod_layout_component *pri_comp, *lod_comp;
+       int i;
+
+       /* The writing extent decides which components in the primary
+        * are affected... */
+       lod_foreach_mirror_comp(pri_comp, lo, primary) {
+               if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent))
+                       continue;
+
+               for (i = 0; i < lo->ldo_mirror_count; i++) {
+                       if (i == primary)
+                               continue;
+
+                       /* ... and then stale other components that are
+                        * overlapping with primary components */
+                       lod_foreach_mirror_comp(lod_comp, lo, i) {
+                               if (!lu_extent_is_overlapped(
+                                                       &pri_comp->llc_extent,
+                                                       &lod_comp->llc_extent))
+                                       continue;
+
+                               CDEBUG(D_LAYOUT, "stale: %u / %u\n",
+                                     i, lod_comp_index(lo, lod_comp));
+
+                               lod_comp->llc_flags |= LCME_FL_STALE;
+                               lo->ldo_mirrors[i].lme_stale = 1;
+                       }
+               }
+       }
+}
+
+static int lod_declare_update_rdonly(const struct lu_env *env,
+               struct lod_object *lo, struct md_layout_change *mlc,
+               struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lu_attr *layout_attr = &info->lti_layout_attr;
+       struct lod_layout_component *lod_comp;
+       struct layout_intent *layout = mlc->mlc_intent;
+       struct lu_extent extent = layout->li_extent;
+       int picked;
+       int i;
+       int rc;
+       ENTRY;
+
+       LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE);
+       LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY);
+       LASSERT(lo->ldo_mirror_count > 0);
+
+       CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n",
+              PFID(lod_object_fid(lo)), PEXT(&extent));
+
+       /**
+        * Pick a mirror as the primary.
+        * Now it only picks the first mirror, this algo can be
+        * revised later after knowing the topology of cluster or
+        * the availability of OSTs.
+        */
+       for (picked = -1, i = 0; i < lo->ldo_mirror_count; i++) {
+               if (!lo->ldo_mirrors[i].lme_stale) {
+                       picked = i;
                        break;
+               }
        }
+       if (picked < 0) /* failed to pick a primary */
+               RETURN(-ENODATA);
 
-       if (need_create)
-               lod_obj_inc_layout_gen(lo);
-       else
-               GOTO(unlock, rc = -EALREADY);
+       CDEBUG(D_LAYOUT, DFID": picked mirror %u as primary\n",
+              PFID(lod_object_fid(lo)), lo->ldo_mirrors[picked].lme_id);
 
-       if (!rc) {
-               info->lti_buf.lb_len = lod_comp_md_size(lo, false);
-               rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf,
-                                              XATTR_NAME_LOV, 0, th);
+       /* stale overlapping components from other mirrors */
+       lod_stale_components(lo, picked, &extent);
+
+       /* instantiate components for the picked mirror, start from 0 */
+       extent = (struct lu_extent) { 0, layout->li_extent.e_end };
+       lod_foreach_mirror_comp(lod_comp, lo, picked) {
+               if (!lu_extent_is_overlapped(&extent,
+                                            &lod_comp->llc_extent))
+                       break;
+
+               if (lod_comp_inited(lod_comp))
+                       continue;
+
+               CDEBUG(D_LAYOUT, "instantiate: %u / %u\n",
+                      i, lod_comp_index(lo, lod_comp));
+
+               info->lti_comp_idx[info->lti_count++] =
+                                               lod_comp_index(lo, lod_comp);
        }
+
+       lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+
+       /* Reset the layout version once it's becoming too large.
+        * This way it can make sure that the layout version is
+        * monotonously increased in this writing era. */
+       lod_obj_inc_layout_gen(lo);
+       if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) {
+               __u32 layout_version;
+
+               cfs_get_random_bytes(&layout_version, sizeof(layout_version));
+               lo->ldo_layout_gen = layout_version & 0xffff;
+       }
+
+       rc = lod_declare_instantiate_components(env, lo, th);
+       if (rc)
+               GOTO(out, rc);
+
+       layout_attr->la_valid = LA_LAYOUT_VERSION;
+       layout_attr->la_layout_version = 0; /* set current version */
+       rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+       if (rc)
+               GOTO(out, rc);
+
 out:
        if (rc)
                lod_object_free_striping(env, lo);
+       RETURN(rc);
+}
 
-unlock:
-       dt_write_unlock(env, next);
+static int lod_declare_update_write_pending(const struct lu_env *env,
+               struct lod_object *lo, struct md_layout_change *mlc,
+               struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lu_attr *layout_attr = &info->lti_layout_attr;
+       struct lod_layout_component *lod_comp;
+       struct lu_extent extent = { 0 };
+       int primary = -1;
+       int i;
+       int rc;
+       ENTRY;
+
+       LASSERT(lo->ldo_flr_state == LCM_FL_WRITE_PENDING);
+       LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE);
+
+       /* look for the primary mirror */
+       for (i = 0; i < lo->ldo_mirror_count; i++) {
+               if (lo->ldo_mirrors[i].lme_stale)
+                       continue;
+
+               LASSERTF(primary < 0, DFID " has multiple primary: %u / %u",
+                        PFID(lod_object_fid(lo)),
+                        lo->ldo_mirrors[i].lme_id,
+                        lo->ldo_mirrors[primary].lme_id);
+
+               primary = i;
+       }
+       if (primary < 0) {
+               CERROR(DFID ": doesn't have a primary mirror\n",
+                      PFID(lod_object_fid(lo)));
+               GOTO(out, rc = -ENODATA);
+       }
+
+       CDEBUG(D_LAYOUT, DFID": found primary %u\n",
+              PFID(lod_object_fid(lo)), lo->ldo_mirrors[primary].lme_id);
 
+       LASSERT(!lo->ldo_mirrors[primary].lme_stale);
+
+       /* for LAYOUT_WRITE opc, it has to do the following operations:
+        * 1. stale overlapping componets from stale mirrors;
+        * 2. instantiate components of the primary mirror;
+        * 3. transfter layout version to all objects of the primary; */
+
+       if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+               LASSERT(mlc->mlc_intent != NULL);
+
+               extent = mlc->mlc_intent->li_extent;
+
+               CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n",
+                      PFID(lod_object_fid(lo)), PEXT(&extent));
+
+               /* 1. stale overlapping components */
+               lod_stale_components(lo, primary, &extent);
+
+               /* 2. find out the components need instantiating.
+                * instantiate [0, mlc->mlc_intent->e_end) */
+               extent.e_start = 0;
+               lod_foreach_mirror_comp(lod_comp, lo, primary) {
+                       if (!lu_extent_is_overlapped(&extent,
+                                                    &lod_comp->llc_extent))
+                               break;
+
+                       if (lod_comp_inited(lod_comp))
+                               continue;
+
+                       CDEBUG(D_LAYOUT, "write instantiate %d / %d\n",
+                              primary, lod_comp_index(lo, lod_comp));
+                       info->lti_comp_idx[info->lti_count++] =
+                                               lod_comp_index(lo, lod_comp);
+               }
+       }
+
+       rc = lod_declare_instantiate_components(env, lo, th);
+       if (rc)
+               GOTO(out, rc);
+
+       layout_attr->la_valid = LA_LAYOUT_VERSION;
+       layout_attr->la_layout_version = 0; /* set current version */
+       rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+       if (rc)
+               GOTO(out, rc);
+
+       lod_obj_inc_layout_gen(lo);
+out:
+       if (rc)
+               lod_object_free_striping(env, lo);
+       RETURN(rc);
+}
+
+static int lod_declare_layout_change(const struct lu_env *env,
+               struct dt_object *dt, struct md_layout_change *mlc,
+               struct thandle *th)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       int rc;
+       ENTRY;
+
+       if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
+           dt_object_remote(dt_object_child(dt)))
+               RETURN(-EINVAL);
+
+       lod_write_lock(env, dt, 0);
+       rc = lod_load_striping_locked(env, lo);
+       if (rc)
+               GOTO(out, rc);
+
+       LASSERT(lo->ldo_comp_cnt > 0);
+
+       rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+       if (rc)
+               GOTO(out, rc);
+
+       switch (lo->ldo_flr_state) {
+       case LCM_FL_NOT_FLR:
+               rc = lod_declare_update_plain(env, lo, mlc->mlc_intent,
+                                             &mlc->mlc_buf, th);
+               break;
+       case LCM_FL_RDONLY:
+               rc = lod_declare_update_rdonly(env, lo, mlc, th);
+               break;
+       case LCM_FL_WRITE_PENDING:
+               rc = lod_declare_update_write_pending(env, lo, mlc, th);
+               break;
+       case LCM_FL_SYNC_PENDING:
+       default:
+               rc = -ENOTSUPP;
+               break;
+       }
+out:
+       dt_write_unlock(env, dt);
        RETURN(rc);
 }
 
@@ -5204,8 +5581,17 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt,
                             struct md_layout_change *mlc, struct thandle *th)
 {
        struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+       struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr;
+       struct lod_object *lo = lod_dt_obj(dt);
+       int rc;
 
-       RETURN(lod_striped_create(env, dt, attr, NULL, th));
+       rc = lod_striped_create(env, dt, attr, NULL, th);
+       if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) {
+               layout_attr->la_layout_version |= lo->ldo_layout_gen;
+               rc = lod_attr_set(env, dt, layout_attr, th);
+       }
+
+       return rc;
 }
 
 struct dt_object_operations lod_obj_ops = {
index 4943bc2..12aee55 100644 (file)
@@ -2155,7 +2155,7 @@ out:
 int lod_obj_stripe_set_inuse_cb(const struct lu_env *env,
                                struct lod_object *lo,
                                struct dt_object *dt, struct thandle *th,
-                               int stripe_idx,
+                               int comp_idx, int stripe_idx,
                                struct lod_obj_stripe_cb_data *data)
 {
        struct lod_thread_info  *info = lod_env_info(env);
@@ -2216,7 +2216,7 @@ int lod_prepare_inuse(const struct lu_env *env, struct lod_object *lo)
        struct lod_thread_info *info = lod_env_info(env);
        struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
        struct ost_pool *inuse = &info->lti_inuse_osts;
-       struct lod_obj_stripe_cb_data data;
+       struct lod_obj_stripe_cb_data data = { { 0 } };
        __u32 stripe_count = 0;
        int i;
        int rc;
@@ -2229,8 +2229,8 @@ int lod_prepare_inuse(const struct lu_env *env, struct lod_object *lo)
                return rc;
 
        data.locd_inuse = inuse;
-       return lod_obj_for_each_stripe(env, lo, NULL,
-                                      lod_obj_stripe_set_inuse_cb, &data);
+       data.locd_stripe_cb = lod_obj_stripe_set_inuse_cb;
+       return lod_obj_for_each_stripe(env, lo, NULL, &data);
 }
 
 int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
index 375675c..9382830 100644 (file)
@@ -230,6 +230,7 @@ struct lov_layout_entry {
 struct lov_mirror_entry {
        unsigned short  lre_mirror_id;
        unsigned short  lre_preferred:1,
+                       lre_stale:1,    /* set if any components is stale */
                        lre_valid:1;    /* set if at least one of components
                                         * in this mirror is valid */
        unsigned short  lre_start;      /* index to lo_entries, start index of
@@ -435,6 +436,8 @@ struct lov_page {
        struct cl_page_slice    lps_cl;
        /** layout_entry + stripe index, composed using lov_comp_index() */
        unsigned int            lps_index;
+       /* the layout gen when this page was created */
+       __u32                   lps_layout_gen;
 };
 
 /*
index fa84386..9440d4f 100644 (file)
@@ -139,6 +139,7 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
        sub_io->ci_pio = io->ci_pio;
        sub_io->ci_lock_no_expand = io->ci_lock_no_expand;
        sub_io->ci_ndelay = io->ci_ndelay;
+       sub_io->ci_layout_version = io->ci_layout_version;
 
        result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
 
@@ -215,12 +216,89 @@ static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
        RETURN(0);
 }
 
+/**
+ * Decide if it will need write intent RPC
+ */
+static int lov_io_mirror_write_intent(struct lov_io *lio,
+       struct lov_object *obj, struct cl_io *io)
+{
+       struct lov_layout_composite *comp = &obj->u.composite;
+       struct lu_extent *ext = &io->ci_write_intent;
+       struct lov_mirror_entry *lre;
+       struct lov_mirror_entry *primary;
+       struct lov_layout_entry *lle;
+       size_t count = 0;
+       ENTRY;
+
+       *ext = (typeof(*ext)) { lio->lis_pos, lio->lis_endpos };
+       io->ci_need_write_intent = 0;
+
+       if (!(io->ci_type == CIT_WRITE || cl_io_is_trunc(io) ||
+             cl_io_is_mkwrite(io)))
+               RETURN(0);
+
+       if (lov_flr_state(obj) == LCM_FL_RDONLY ||
+           lov_flr_state(obj) == LCM_FL_SYNC_PENDING) {
+               io->ci_need_write_intent = 1;
+               RETURN(0);
+       }
+
+       LASSERT((lov_flr_state(obj) == LCM_FL_WRITE_PENDING));
+       LASSERT(comp->lo_preferred_mirror >= 0);
+
+       /* need to iterate all components to see if there are
+        * multiple components covering the writing component */
+       primary = &comp->lo_mirrors[comp->lo_preferred_mirror];
+       LASSERT(!primary->lre_stale);
+       lov_foreach_mirror_layout_entry(obj, lle, primary) {
+               LASSERT(lle->lle_valid);
+               if (!lu_extent_is_overlapped(ext, lle->lle_extent))
+                       continue;
+
+               ext->e_start = MIN(ext->e_start, lle->lle_extent->e_start);
+               ext->e_end = MAX(ext->e_end, lle->lle_extent->e_end);
+               ++count;
+       }
+       if (count == 0) {
+               CERROR(DFID ": cannot find any valid components covering "
+                      "file extent "DEXT", mirror: %d\n",
+                      PFID(lu_object_fid(lov2lu(obj))), PEXT(ext),
+                      primary->lre_mirror_id);
+               RETURN(-EIO);
+       }
+
+       count = 0;
+       lov_foreach_mirror_entry(obj, lre) {
+               if (lre == primary)
+                       continue;
+
+               lov_foreach_mirror_layout_entry(obj, lle, lre) {
+                       if (!lle->lle_valid)
+                               continue;
+
+                       if (lu_extent_is_overlapped(ext, lle->lle_extent)) {
+                               ++count;
+                               break;
+                       }
+               }
+       }
+
+       CDEBUG(D_VFSTRACE, DFID "there are %zd components to be staled to "
+              "modify file extent "DEXT", iot: %d\n",
+              PFID(lu_object_fid(lov2lu(obj))), count, PEXT(ext), io->ci_type);
+
+       io->ci_need_write_intent = count > 0;
+
+       RETURN(0);
+}
+
 static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
                               struct cl_io *io)
 {
        struct lov_layout_composite *comp = &obj->u.composite;
        int index;
        int i;
+       int result;
        ENTRY;
 
        if (!lov_is_flr(obj)) {
@@ -230,6 +308,22 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
                RETURN(0);
        }
 
+       result = lov_io_mirror_write_intent(lio, obj, io);
+       if (result)
+               RETURN(result);
+
+       if (io->ci_need_write_intent) {
+               CDEBUG(D_VFSTRACE, DFID " need write intent for [%llu, %llu)\n",
+                      PFID(lu_object_fid(lov2lu(obj))),
+                      lio->lis_pos, lio->lis_endpos);
+
+               /* stop cl_io_init() loop */
+               RETURN(1);
+       }
+
+       /* transfer the layout version for verification */
+       io->ci_layout_version = obj->lo_lsm->lsm_layout_gen;
+
        if (io->ci_ndelay_tried == 0 || /* first time to try */
            /* reset the mirror index if layout has changed */
            lio->lis_mirror_layout_gen != obj->lo_lsm->lsm_layout_gen) {
@@ -333,7 +427,7 @@ static int lov_io_slice_init(struct lov_io *lio,
                         * the current file-tail exactly. */
                        if (unlikely(obj->lo_lsm->lsm_entries[0]->lsme_pattern &
                                     LOV_PATTERN_F_HOLE))
-                               RETURN(-EIO);
+                               GOTO(out, result = -EIO);
 
                        lio->lis_pos = 0;
                        lio->lis_endpos = OBD_OBJECT_EOF;
@@ -378,7 +472,8 @@ static int lov_io_slice_init(struct lov_io *lio,
 
                if (lov_flr_state(obj) == LCM_FL_RDONLY &&
                    !OBD_FAIL_CHECK(OBD_FAIL_FLR_GLIMPSE_IMMUTABLE))
-                       RETURN(1); /* SoM is accurate, no need glimpse */
+                       /* SoM is accurate, no need glimpse */
+                       GOTO(out, result = 1);
                break;
 
         case CIT_MISC:
@@ -392,12 +487,12 @@ static int lov_io_slice_init(struct lov_io *lio,
 
        result = lov_io_mirror_init(lio, obj, io);
        if (result)
-               RETURN(result);
+               GOTO(out, result);
 
        /* check if it needs to instantiate layout */
        if (!(io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io) ||
              (cl_io_is_trunc(io) && io->u.ci_setattr.sa_attr.lvb_size > 0)))
-               RETURN(0);
+               GOTO(out, result = 0);
 
        ext.e_start = lio->lis_pos;
        ext.e_end = lio->lis_endpos;
@@ -414,12 +509,13 @@ static int lov_io_slice_init(struct lov_io *lio,
                if (!lsm_entry_inited(obj->lo_lsm, index)) {
                        io->ci_need_write_intent = 1;
                        io->ci_write_intent = ext;
-                       result = 1;
-                       break;
+                       GOTO(out, result = 1);
                }
        }
+       EXIT;
 
-       RETURN(result);
+out:
+       return result;
 }
 
 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
@@ -836,6 +932,10 @@ static int lov_io_read_ahead(const struct lu_env *env,
        if (index < 0 || !lsm_entry_inited(loo->lo_lsm, index))
                RETURN(-ENODATA);
 
+       /* avoid readahead to expand to stale components */
+       if (!lov_entry(loo, index)->lle_valid)
+               RETURN(-EIO);
+
        stripe = lov_stripe_number(loo->lo_lsm, index, offset);
 
        r0 = lov_r0(loo, index);
index d8d479b..15d5c3c 100644 (file)
@@ -699,6 +699,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                if (i > 0) {
                        if (mirror_id == lre->lre_mirror_id) {
                                lre->lre_valid |= lle->lle_valid;
+                               lre->lre_stale |= !lle->lle_valid;
                                lre->lre_end = i;
                                continue;
                        }
@@ -719,6 +720,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                lre->lre_preferred = (lle->lle_lsme->lsme_flags &
                                        LCME_FL_PREFERRED);
                lre->lre_valid = lle->lle_valid;
+               lre->lre_stale = !lle->lle_valid;
        }
 
        /* sanity check for FLR */
@@ -758,7 +760,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
        mirror_count = 0, i = 0;
        lov_foreach_mirror_entry(lov, lre) {
                i++;
-               if (!lre->lre_valid)
+               if (lre->lre_stale)
                        continue;
 
                mirror_count++; /* valid mirror */
index 5ab3da5..fa7aea7 100644 (file)
@@ -56,8 +56,8 @@ static int lov_comp_page_print(const struct lu_env *env,
        struct lov_page *lp = cl2lov_page(slice);
 
        return (*printer)(env, cookie,
-                         LUSTRE_LOV_NAME"-page@%p, comp index: %x\n",
-                         lp, lp->lps_index);
+                         LUSTRE_LOV_NAME"-page@%p, comp index: %x, gen: %u\n",
+                         lp, lp->lps_index, lp->lps_layout_gen);
 }
 
 static const struct cl_page_operations lov_comp_page_ops = {
@@ -96,6 +96,7 @@ int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj,
        LASSERT(rc == 0);
 
        lpg->lps_index = lov_comp_index(entry, stripe);
+       lpg->lps_layout_gen = loo->lo_lsm->lsm_layout_gen;
        cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_comp_page_ops);
 
        sub = lov_sub_get(env, lio, lpg->lps_index);
index c45c86b..6eec728 100644 (file)
@@ -1865,41 +1865,190 @@ static int mdd_declare_layout_change(const struct lu_env *env,
 }
 
 /* For PFL, this is used to instantiate necessary component objects. */
-int mdd_layout_change(const struct lu_env *env, struct md_object *obj,
-                     struct md_layout_change *mlc)
+static int
+mdd_layout_instantiate_component(const struct lu_env *env,
+               struct mdd_object *obj, struct md_layout_change *mlc,
+               struct thandle *handle)
 {
-       struct mdd_object *mdd_obj = md2mdd_obj(obj);
-       struct mdd_device *mdd = mdo2mdd(obj);
-       struct thandle *handle;
+       struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
        int rc;
        ENTRY;
 
-       handle = mdd_trans_create(env, mdd);
-       if (IS_ERR(handle))
-               RETURN(PTR_ERR(handle));
-
-       rc = mdd_declare_layout_change(env, mdd, mdd_obj, mlc, handle);
+       rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
        /**
         * It's possible that another layout write intent has already
         * instantiated our objects, so a -EALREADY returned, and we need to
         * do nothing.
         */
        if (rc)
-               GOTO(stop, rc = (rc == -EALREADY) ? 0 : rc);
+               RETURN(rc == -EALREADY ? 0 : rc);
 
        rc = mdd_trans_start(env, mdd, handle);
        if (rc)
-               GOTO(stop, rc);
+               RETURN(rc);
 
-       mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-       rc = mdo_layout_change(env, mdd_obj, mlc, handle);
-       mdd_write_unlock(env, mdd_obj);
+       mdd_write_lock(env, obj, MOR_TGT_CHILD);
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       mdd_write_unlock(env, obj);
        if (rc)
-               GOTO(stop, rc);
+               RETURN(rc);
 
-       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, mdd_obj, handle);
-stop:
-       RETURN(mdd_trans_stop(env, mdd, rc, handle));
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle);
+       RETURN(rc);
+}
+
+/**
+ * Change the FLR layout from RDONLY to WRITE_PENDING.
+ *
+ * It picks the primary mirror, and bumps the layout version, and set
+ * layout version xattr to OST objects in a sync tx. In order to facilitate
+ * the handling of phantom writers from evicted clients, the clients carry
+ * layout version of the file with write RPC, so that the OSTs can verify
+ * if the write RPCs are legitimate, meaning not from evicted clients.
+ */
+static int
+mdd_layout_update_rdonly(const struct lu_env *env, struct mdd_object *obj,
+                        struct md_layout_change *mlc, struct thandle *handle)
+{
+       struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
+       int rc;
+       ENTRY;
+
+       rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdd_declare_xattr_del(env, mdd, obj, XATTR_NAME_SOM, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       /* record a changelog for data mover to consume */
+       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       /* it needs a sync tx to make FLR to work properly */
+       handle->th_sync = 1;
+
+       mdd_write_lock(env, obj, MOR_TGT_CHILD);
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       if (!rc) {
+               rc = mdo_xattr_del(env, obj, XATTR_NAME_SOM, handle);
+               if (rc == -ENODATA)
+                       rc = 0;
+       }
+       mdd_write_unlock(env, obj);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdd_changelog_data_store(env, mdd, CL_FLRW, 0, obj, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       EXIT;
+
+out:
+       return rc;
+}
+
+static int
+mdd_layout_update_write_pending(const struct lu_env *env,
+               struct mdd_object *obj, struct md_layout_change *mlc,
+               struct thandle *handle)
+{
+       struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
+       int rc;
+       ENTRY;
+
+       rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       /* it needs a sync tx to make FLR to work properly */
+       handle->th_sync = 1;
+
+       mdd_write_lock(env, obj, MOR_TGT_CHILD);
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       mdd_write_unlock(env, obj);
+       if (rc)
+               GOTO(out, rc);
+
+       EXIT;
+
+out:
+       return rc;
+}
+
+/**
+ * Layout change callback for object.
+ *
+ * This is only used by FLR for now. In the future, it can be exteneded to
+ * handle all layout change.
+ */
+static int
+mdd_layout_change(const struct lu_env *env, struct md_object *o,
+                 struct md_layout_change *mlc)
+{
+       struct mdd_object       *obj = md2mdd_obj(o);
+       struct mdd_device       *mdd = mdd_obj2mdd_dev(obj);
+       struct lu_buf           *buf = mdd_buf_get(env, NULL, 0);
+       struct lov_comp_md_v1   *lcm;
+       struct thandle          *handle;
+       int flr_state;
+       int rc;
+       ENTRY;
+
+       if (mlc->mlc_opc != MD_LAYOUT_WRITE)
+               RETURN(-ENOTSUPP);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       rc = mdd_get_lov_ea(env, obj, buf);
+       if (rc < 0) {
+               if (rc == -ENODATA)
+                       rc = -EINVAL;
+               GOTO(out, rc);
+       }
+
+       /* analyze the layout to make sure it's a FLR file */
+       lcm = buf->lb_buf;
+       if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
+               GOTO(out, rc = -EINVAL);
+
+       flr_state = le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK;
+
+       /* please refer to HLD of FLR for state transition */
+       switch (flr_state) {
+       case LCM_FL_NOT_FLR:
+               rc = mdd_layout_instantiate_component(env, obj, mlc, handle);
+               break;
+       case LCM_FL_WRITE_PENDING:
+               rc = mdd_layout_update_write_pending(env, obj, mlc, handle);
+               break;
+       case LCM_FL_RDONLY:
+               rc = mdd_layout_update_rdonly(env, obj, mlc, handle);
+               break;
+       case LCM_FL_SYNC_PENDING:
+       default:
+               rc = 0;
+               break;
+       }
+       EXIT;
+
+out:
+       mdd_trans_stop(env, mdd, rc, handle);
+       lu_buf_free(buf);
+       return rc;
 }
 
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
index 480a9f5..3f9d45b 100644 (file)
@@ -252,6 +252,7 @@ void lustre_swab_llog_rec(struct llog_rec_hdr *rec)
                                (struct llog_setattr64_rec_v2 *)rec;
 
                        __swab32s(&lsr2->lsr_projid);
+                       __swab32s(&lsr2->lsr_layout_version);
                        tail = &lsr2->lsr_tail;
                } else {
                        tail = &lsr->lsr_tail;
index 6c0abd0..8b498c0 100644 (file)
@@ -1367,7 +1367,6 @@ static int ofd_setattr_hdl(struct tgt_session_info *tsi)
        struct ost_body         *repbody;
        struct ldlm_resource    *res;
        struct ofd_object       *fo;
-       struct filter_fid       *ff = NULL;
        int                      rc = 0;
 
        ENTRY;
@@ -1407,13 +1406,8 @@ static int ofd_setattr_hdl(struct tgt_session_info *tsi)
        la_from_obdo(&fti->fti_attr, &body->oa, body->oa.o_valid);
        fti->fti_attr.la_valid &= ~LA_TYPE;
 
-       if (body->oa.o_valid & OBD_MD_FLFID) {
-               ff = &fti->fti_mds_fid;
-               ofd_prepare_fidea(ff, &body->oa);
-       }
-
        /* setting objects attributes (including owner/group) */
-       rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, ff);
+       rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, &body->oa);
        if (rc != 0)
                GOTO(out_put, rc);
 
@@ -2017,7 +2011,6 @@ static int ofd_punch_hdl(struct tgt_session_info *tsi)
        struct ldlm_namespace   *ns = tsi->tsi_tgt->lut_obd->obd_namespace;
        struct ldlm_resource    *res;
        struct ofd_object       *fo;
-       struct filter_fid       *ff = NULL;
        __u64                    flags = 0;
        struct lustre_handle     lh = { 0, };
        int                      rc;
@@ -2078,13 +2071,8 @@ static int ofd_punch_hdl(struct tgt_session_info *tsi)
        info->fti_attr.la_size = start;
        info->fti_attr.la_valid |= LA_SIZE;
 
-       if (oa->o_valid & OBD_MD_FLFID) {
-               ff = &info->fti_mds_fid;
-               ofd_prepare_fidea(ff, oa);
-       }
-
        rc = ofd_object_punch(tsi->tsi_env, fo, start, end, &info->fti_attr,
-                             ff, (struct obdo *)oa);
+                             (struct obdo *)oa);
        if (rc)
                GOTO(out_put, rc);
 
index 9c7a582..d303646 100644 (file)
@@ -325,6 +325,8 @@ int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd);
 int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd);
 int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo,
                  struct obdo *oa);
+int ofd_verify_layout_version(const struct lu_env *env,
+                             struct ofd_object *fo, const struct obdo *oa);
 int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp,
               struct obdo *oa, int objcount, struct obd_ioobj *obj,
               struct niobuf_remote *rnb, int *nr_local,
@@ -358,6 +360,8 @@ struct ofd_object *ofd_object_find(const struct lu_env *env,
                                   struct ofd_device *ofd,
                                   const struct lu_fid *fid);
 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo);
+int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
+                        const struct obdo *oa, struct filter_fid *ff);
 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
                          u64 id, struct ofd_seq *oseq, int nr, int sync);
 
@@ -367,10 +371,10 @@ static inline void ofd_object_put(const struct lu_env *env,
        dt_object_put(env, &fo->ofo_obj);
 }
 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
-                struct lu_attr *la, struct filter_fid *ff);
+                struct lu_attr *la, struct obdo *oa);
 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                     __u64 start, __u64 end, struct lu_attr *la,
-                    struct filter_fid *ff, struct obdo *oa);
+                    struct obdo *oa);
 int ofd_destroy(const struct lu_env *, struct ofd_object *, int);
 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
                 struct lu_attr *la);
@@ -485,23 +489,6 @@ static inline void ofd_slc_set(struct ofd_device *ofd)
                ofd->ofd_lut.lut_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
 }
 
-static inline void ofd_prepare_fidea(struct filter_fid *ff,
-                                    const struct obdo *oa)
-{
-       /* packing fid and converting it to LE for storing into EA.
-        * Here ->o_stripe_idx should be filled by LOV and rest of
-        * fields - by client. */
-       ff->ff_parent.f_seq = cpu_to_le64(oa->o_parent_seq);
-       ff->ff_parent.f_oid = cpu_to_le32(oa->o_parent_oid);
-       /* XXX: we are ignoring o_parent_ver here, since this should
-        *      be the same for all objects in this fileset. */
-       ff->ff_parent.f_ver = cpu_to_le32(oa->o_stripe_idx);
-       if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
-               ost_layout_cpu_to_le(&ff->ff_layout, &oa->o_layout);
-       else
-               memset(&ff->ff_layout, 0, sizeof(ff->ff_layout));
-}
-
 static inline int ofd_validate_seq(struct obd_export *exp, __u64 seq)
 {
        struct filter_export_data *fed = &exp->exp_filter_data;
index 924bffa..2c51f01 100644 (file)
@@ -427,6 +427,49 @@ int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo,
 }
 
 /**
+ * FLR: verify the layout version of object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] fo       OFD object
+ * \param[in] oa       OBDO structure with layout version
+ *
+ * \retval             0 on successful verification
+ * \retval             -EINPROGRESS layout version is in transfer
+ * \retval             -ESTALE the layout version on client is stale
+ */
+int ofd_verify_layout_version(const struct lu_env *env,
+                             struct ofd_object *fo, const struct obdo *oa)
+{
+       int rc;
+       ENTRY;
+
+       rc = ofd_object_ff_load(env, fo);
+       if (rc < 0) {
+               if (rc == -ENODATA)
+                       rc = -EINPROGRESS;
+               GOTO(out, rc);
+       }
+
+       /* this update is not legitimate */
+       if (oa->o_layout_version < fo->ofo_ff.ff_layout_version)
+               GOTO(out, rc = -ESTALE);
+
+       /* layout version is not transmitted yet */
+       if (oa->o_layout_version >
+           fo->ofo_ff.ff_layout_version + fo->ofo_ff.ff_range)
+               GOTO(out, rc = -EINPROGRESS);
+
+       EXIT;
+
+out:
+       CDEBUG(D_INODE, DFID " verify layout version: %u vs. %u, rc: %d\n",
+              PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
+              fo->ofo_ff.ff_layout_version, oa->o_layout_version, rc);
+       return rc;
+
+}
+
+/**
  * Prepare buffers for read request processing.
  *
  * This function converts remote buffers from client to local buffers
@@ -628,6 +671,18 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                }
        }
 
+       /* need to verify layout version */
+       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+               rc = ofd_verify_layout_version(env, fo, oa);
+               if (rc) {
+                       ofd_read_unlock(env, fo);
+                       ofd_object_put(env, fo);
+                       GOTO(out, rc);
+               }
+
+               oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+       }
+
        /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
         * space back if possible */
        tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
@@ -817,7 +872,7 @@ ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd,
  * \param[in] ofd      OFD device
  * \param[in] ofd_obj  OFD object
  * \param[in] la       object attributes
- * \param[in] ff       parent FID
+ * \param[in] oa       obdo
  *
  * \retval             0 on successful attributes update
  * \retval             negative value on error
@@ -825,14 +880,15 @@ ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd,
 static int
 ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
                   struct ofd_object *ofd_obj, struct lu_attr *la,
-                  struct filter_fid *ff)
+                  struct obdo *oa)
 {
        struct ofd_thread_info  *info = ofd_info(env);
+       struct filter_fid       *ff = &info->fti_mds_fid;
        __u64                    valid = la->la_valid;
-       int                      rc;
        struct thandle          *th;
        struct dt_object        *dt_obj;
-       int                      ff_needed = 0;
+       int                      fl = 0;
+       int                      rc;
 
        ENTRY;
 
@@ -847,15 +903,11 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
        if (rc != 0)
                GOTO(out, rc);
 
-       if (ff != NULL) {
-               rc = ofd_object_ff_load(env, ofd_obj);
-               if (rc == -ENODATA)
-                       ff_needed = 1;
-               else if (rc < 0)
-                       GOTO(out, rc);
-       }
+       fl = ofd_object_ff_update(env, ofd_obj, oa, ff);
+       if (fl < 0)
+               GOTO(out, rc = fl);
 
-       if (!la->la_valid && !ff_needed)
+       if (!la->la_valid && !fl)
                /* no attributes to set */
                GOTO(out, rc = 0);
 
@@ -869,14 +921,12 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
                        GOTO(out_tx, rc);
        }
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
                        ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
                else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
                        le32_add_cpu(&ff->ff_parent.f_oid, -1);
 
-               info->fti_buf.lb_buf = ff;
-               info->fti_buf.lb_len = sizeof(*ff);
                rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf,
                                          XATTR_NAME_FID, 0, th);
                if (rc)
@@ -896,14 +946,21 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
                        GOTO(out_tx, rc);
        }
 
-       /* set filter fid EA */
-       if (ff_needed) {
+       /* set filter fid EA.
+        * FIXME: it holds read lock of ofd object to modify the XATTR_NAME_FID
+        * while the write lock should be held. However, it should work because
+        * write RPCs only modify ff_{parent,layout} and those information will
+        * be the same from all the write RPCs. The reason that fl is not used
+        * in dt_xattr_set() is to allow this race. */
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
                        GOTO(out_tx, rc);
 
+               info->fti_buf.lb_buf = ff;
+               info->fti_buf.lb_len = sizeof(*ff);
                rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID,
                                  0, th);
-               if (!rc)
+               if (rc == 0)
                        filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff));
        }
 
@@ -1012,7 +1069,7 @@ static int ofd_soft_sync_cb_add(struct thandle *th, struct obd_export *exp)
 static int
 ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
                   struct ofd_device *ofd, const struct lu_fid *fid,
-                  struct lu_attr *la, struct filter_fid *ff, int objcount,
+                  struct lu_attr *la, struct obdo *oa, int objcount,
                   int niocount, struct niobuf_local *lnb,
                   unsigned long granted, int old_rc)
 {
@@ -1048,7 +1105,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
         * dt_declare_write_commit() since quota enforcement is now handled in
         * declare phases.
         */
-       rc = ofd_write_attr_set(env, ofd, fo, la, ff);
+       rc = ofd_write_attr_set(env, ofd, fo, la, oa);
        if (rc)
                GOTO(out, rc);
 
@@ -1203,7 +1260,6 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
        struct ofd_mod_data     *fmd;
        __u64                    valid;
        struct ofd_device       *ofd = ofd_exp(exp);
-       struct filter_fid       *ff = NULL;
        const struct lu_fid     *fid = &oa->o_oi.oi_fid;
        int                      rc = 0;
 
@@ -1227,13 +1283,8 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                ofd_fmd_put(exp, fmd);
                la_from_obdo(&info->fti_attr, oa, valid);
 
-               if (oa->o_valid & OBD_MD_FLFID) {
-                       ff = &info->fti_mds_fid;
-                       ofd_prepare_fidea(ff, oa);
-               }
-
                rc = ofd_commitrw_write(env, exp, ofd, fid, &info->fti_attr,
-                                       ff, objcount, npages, lnb,
+                                       oa, objcount, npages, lnb,
                                        oa->o_grant_used, old_rc);
                if (rc == 0)
                        obdo_from_la(oa, &info->fti_attr,
index bb4e1be..0a251f3 100644 (file)
@@ -817,7 +817,6 @@ static int ofd_echo_setattr(const struct lu_env *env, struct obd_export *exp,
        struct ldlm_resource    *res;
        struct ofd_object       *fo;
        struct lu_fid           *fid = &oa->o_oi.oi_fid;
-       struct filter_fid       *ff = NULL;
        int                      rc = 0;
 
        ENTRY;
@@ -854,13 +853,8 @@ static int ofd_echo_setattr(const struct lu_env *env, struct obd_export *exp,
        la_from_obdo(&info->fti_attr, oa, oa->o_valid);
        info->fti_attr.la_valid &= ~LA_TYPE;
 
-       if (oa->o_valid & OBD_MD_FLFID) {
-               ff = &info->fti_mds_fid;
-               ofd_prepare_fidea(ff, oa);
-       }
-
        /* setting objects attributes (including owner/group) */
-       rc = ofd_attr_set(env, fo, &info->fti_attr, ff);
+       rc = ofd_attr_set(env, fo, &info->fti_attr, oa);
        if (rc)
                GOTO(out_unlock, rc);
 
index 9f76081..ec5fb4f 100644 (file)
@@ -152,8 +152,7 @@ int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
 
        if (unlikely(rc < sizeof(struct lu_fid))) {
                fid_zero(&ff->ff_parent);
-
-               return -ENODATA;
+               return -EINVAL;
        }
 
        filter_fid_le_to_cpu(ff, ff, rc);
@@ -474,6 +473,91 @@ int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
 }
 
 /**
+ * Check if it needs to update filter_fid by the value of @oa.
+ *
+ * \param[in] env      env
+ * \param[in] fo       ofd object
+ * \param[in] oa       obdo from client or MDT
+ * \param[out] ff      if filter_fid needs updating, this field is used to
+ *                     return the new buffer
+ *
+ * \retval < 0         error occurred
+ * \retval 0           doesn't need to update filter_fid
+ * \retval FL_XATTR_{CREATE,REPLACE}   flag for xattr update
+ */
+int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
+                        const struct obdo *oa, struct filter_fid *ff)
+{
+       int rc = 0;
+       ENTRY;
+
+       if (!(oa->o_valid &
+             (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
+               RETURN(0);
+
+       rc = ofd_object_ff_load(env, fo);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       LASSERT(ff != &fo->ofo_ff);
+       if (rc == -ENODATA) {
+               rc = LU_XATTR_CREATE;
+               memset(ff, 0, sizeof(*ff));
+       } else {
+               rc = LU_XATTR_REPLACE;
+               memcpy(ff, &fo->ofo_ff, sizeof(*ff));
+       }
+
+       if (oa->o_valid & OBD_MD_FLFID) {
+               /* packing fid and converting it to LE for storing into EA.
+                * Here ->o_stripe_idx should be filled by LOV and rest of
+                * fields - by client. */
+               ff->ff_parent.f_seq = oa->o_parent_seq;
+               ff->ff_parent.f_oid = oa->o_parent_oid;
+               /* XXX: we are ignoring o_parent_ver here, since this should
+                *      be the same for all objects in this fileset. */
+               ff->ff_parent.f_ver = oa->o_stripe_idx;
+       }
+       if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
+               ff->ff_layout = oa->o_layout;
+
+       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+               CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
+                      PFID(&fo->ofo_ff.ff_parent),
+                      PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
+                      ff->ff_layout_version, oa->o_layout_version);
+
+               /* only the MDS has the authority to update layout version */
+               if (!(exp_connect_flags(ofd_info(env)->fti_exp) &
+                     OBD_CONNECT_MDS)) {
+                       CERROR(DFID": update layout version from client\n",
+                              PFID(&fo->ofo_ff.ff_parent));
+
+                       RETURN(-EPERM);
+               }
+
+               /* it's not allowed to change it to a smaller value */
+               if (oa->o_layout_version < ff->ff_layout_version)
+                       RETURN(-EINVAL);
+
+               if (ff->ff_layout_version == 0) {
+                       ff->ff_layout_version = oa->o_layout_version;
+                       ff->ff_range = 0;
+               } else if (oa->o_layout_version > ff->ff_layout_version) {
+                       ff->ff_range = MAX(ff->ff_range,
+                                 oa->o_layout_version - ff->ff_layout_version);
+               }
+       }
+
+       if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
+               filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
+       else /* no change */
+               rc = 0;
+
+       RETURN(rc);
+}
+
+/**
  * Set OFD object attributes.
  *
  * This function sets OFD object attributes taken from incoming request.
@@ -484,19 +568,20 @@ int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
  * \param[in] env      execution environment
  * \param[in] fo       OFD object
  * \param[in] la       object attributes
- * \param[in] ff       filter_fid structure, contains additional attributes
+ * \param[in] oa       obdo carries fid, ost_layout, layout version
  *
  * \retval             0 if successful
  * \retval             negative value on error
  */
 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
-                struct lu_attr *la, struct filter_fid *ff)
+                struct lu_attr *la, struct obdo *oa)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_device       *ofd = ofd_obj2dev(fo);
+       struct filter_fid       *ff = &info->fti_mds_fid;
        struct thandle          *th;
        struct ofd_mod_data     *fmd;
-       int                     ff_needed = 0;
+       int                     fl;
        int                     rc;
        int                     rc2;
        ENTRY;
@@ -521,13 +606,9 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
        if (rc != 0)
                GOTO(unlock, rc);
 
-       if (ff != NULL) {
-               rc = ofd_object_ff_load(env, fo);
-               if (rc == -ENODATA)
-                       ff_needed = 1;
-               else if (rc < 0)
-                       GOTO(unlock, rc);
-       }
+       fl = ofd_object_ff_update(env, fo, oa, ff);
+       if (fl < 0)
+               GOTO(unlock, rc = fl);
 
        th = ofd_trans_create(env, ofd);
        if (IS_ERR(th))
@@ -537,7 +618,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
                        ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
                else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
@@ -546,7 +627,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
                info->fti_buf.lb_buf = ff;
                info->fti_buf.lb_len = sizeof(*ff);
                rc = dt_declare_xattr_set(env, ofd_object_child(fo),
-                                         &info->fti_buf, XATTR_NAME_FID, 0,
+                                         &info->fti_buf, XATTR_NAME_FID, fl,
                                          th);
                if (rc)
                        GOTO(stop, rc);
@@ -560,12 +641,14 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
                        GOTO(stop, rc);
 
+               info->fti_buf.lb_buf = ff;
+               info->fti_buf.lb_len = sizeof(*ff);
                rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
-                                 XATTR_NAME_FID, 0, th);
+                                 XATTR_NAME_FID, fl, th);
                if (!rc)
                        filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
        }
@@ -599,7 +682,6 @@ unlock:
  * \param[in] start    start offset to punch from
  * \param[in] end      end of punch
  * \param[in] la       object attributes
- * \param[in] ff       filter_fid structure
  * \param[in] oa       obdo struct from incoming request
  *
  * \retval             0 if successful
@@ -607,14 +689,15 @@ unlock:
  */
 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                     __u64 start, __u64 end, struct lu_attr *la,
-                    struct filter_fid *ff, struct obdo *oa)
+                    struct obdo *oa)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_device       *ofd = ofd_obj2dev(fo);
        struct ofd_mod_data     *fmd;
        struct dt_object        *dob = ofd_object_child(fo);
+       struct filter_fid       *ff = &info->fti_mds_fid;
        struct thandle          *th;
-       int                     ff_needed = 0;
+       int                     fl;
        int                     rc;
        int                     rc2;
 
@@ -638,6 +721,15 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                        GOTO(unlock, rc);
        }
 
+       /* need to verify layout version */
+       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+               rc = ofd_verify_layout_version(env, fo, oa);
+               if (rc)
+                       GOTO(unlock, rc);
+
+               oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+       }
+
        /* VBR: version recovery check */
        rc = ofd_version_get_check(info, fo);
        if (rc)
@@ -647,13 +739,9 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
        if (rc != 0)
                GOTO(unlock, rc);
 
-       if (ff != NULL) {
-               rc = ofd_object_ff_load(env, fo);
-               if (rc == -ENODATA)
-                       ff_needed = 1;
-               else if (rc < 0)
-                       GOTO(unlock, rc);
-       }
+       fl = ofd_object_ff_update(env, fo, oa, ff);
+       if (fl < 0)
+               GOTO(unlock, rc = fl);
 
        th = ofd_trans_create(env, ofd);
        if (IS_ERR(th))
@@ -667,7 +755,7 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
                        ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
                else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
@@ -676,7 +764,7 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                info->fti_buf.lb_buf = ff;
                info->fti_buf.lb_len = sizeof(*ff);
                rc = dt_declare_xattr_set(env, ofd_object_child(fo),
-                                         &info->fti_buf, XATTR_NAME_FID, 0,
+                                         &info->fti_buf, XATTR_NAME_FID, fl,
                                          th);
                if (rc)
                        GOTO(stop, rc);
@@ -694,12 +782,12 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
                        GOTO(stop, rc);
 
                rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
-                                 XATTR_NAME_FID, 0, th);
+                                 XATTR_NAME_FID, fl, th);
                if (!rc)
                        filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
        }
index 101e666..f94f053 100644 (file)
@@ -2533,6 +2533,9 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                ++ext->oe_nr_pages;
                list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
                osc_object_unlock(osc);
+
+               if (!ext->oe_layout_version)
+                       ext->oe_layout_version = io->ci_layout_version;
        }
 
        RETURN(rc);
@@ -2720,8 +2723,9 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
        RETURN(rc);
 }
 
-int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
-                        struct list_head *list, int brw_flags)
+int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+                        struct osc_object *obj, struct list_head *list,
+                        int brw_flags)
 {
        struct client_obd     *cli = osc_cli(obj);
        struct osc_extent     *ext;
@@ -2771,6 +2775,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
        ext->oe_nr_pages = page_count;
        ext->oe_mppr = mppr;
        list_splice_init(list, &ext->oe_pages);
+       ext->oe_layout_version = io->ci_layout_version;
 
        osc_object_lock(obj);
        /* Reuse the initial refcount for RPC, don't drop it */
index 6650f0a..5ccff72 100644 (file)
@@ -187,7 +187,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
 
                if (++queued == max_pages) {
                        queued = 0;
-                       result = osc_queue_sync_pages(env, osc, &list,
+                       result = osc_queue_sync_pages(env, io, osc, &list,
                                                      brw_flags);
                        if (result < 0)
                                break;
@@ -195,7 +195,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
        }
 
        if (queued > 0)
-               result = osc_queue_sync_pages(env, osc, &list, brw_flags);
+               result = osc_queue_sync_pages(env, io, osc, &list, brw_flags);
 
        /* Update c/mtime for sync write. LU-7310 */
        if (crt == CRT_WRITE && qout->pl_nr > 0 && result == 0) {
@@ -556,6 +556,12 @@ static int osc_io_setattr_start(const struct lu_env *env,
                                 oa->o_flags = OBD_FL_SRVLOCK;
                                 oa->o_valid |= OBD_MD_FLFLAGS;
                         }
+
+                       if (io->ci_layout_version > 0) {
+                               /* verify layout version */
+                               oa->o_valid |= OBD_MD_LAYOUT_VERSION;
+                               oa->o_layout_version = io->ci_layout_version;
+                       }
                 } else {
                         LASSERT(oio->oi_lockless == 0);
                 }
index 9cb2c6d..6d7929d 100644 (file)
@@ -1932,6 +1932,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        int                             i;
        int                             grant = 0;
        int                             rc;
+       __u32                           layout_version = 0;
        struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
        struct ost_body                 *body;
        ENTRY;
@@ -1943,6 +1944,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                mem_tight |= ext->oe_memalloc;
                grant += ext->oe_grants;
                page_count += ext->oe_nr_pages;
+               layout_version = MAX(layout_version, ext->oe_layout_version);
                if (obj == NULL)
                        obj = ext->oe_obj;
        }
@@ -2000,8 +2002,16 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        crattr->cra_oa = oa;
        cl_req_attr_set(env, osc2cl(obj), crattr);
 
-       if (cmd == OBD_BRW_WRITE)
+       if (cmd == OBD_BRW_WRITE) {
                oa->o_grant_used = grant;
+               if (layout_version > 0) {
+                       CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
+                              PFID(&oa->o_oi.oi_fid), layout_version);
+
+                       oa->o_layout_version = layout_version;
+                       oa->o_valid |= OBD_MD_LAYOUT_VERSION;
+               }
+       }
 
        sort_brw_pages(pga, page_count);
        rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
index 446f55a..d67de46 100644 (file)
@@ -667,10 +667,10 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
-       if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+       if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
                RETURN(0);
 
-       /* track all UID/GID changes via llog */
+       /* track all UID/GID, projid, and layout version changes via llog */
        rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
 
        return 0;
@@ -704,8 +704,8 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
        int                      rc = 0;
        ENTRY;
 
-       /* we're interested in uid/gid/projid changes only */
-       if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+       /* we're interested in uid/gid/projid/layout version changes only */
+       if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
                RETURN(0);
 
        if (!is_only_remote_trans(th)) {
index 80c11c2..47ec49c 100644 (file)
@@ -427,11 +427,14 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                LASSERT(attr);
                osi->osi_setattr.lsr_uid = attr->la_uid;
                osi->osi_setattr.lsr_gid = attr->la_gid;
+               osi->osi_setattr.lsr_layout_version = attr->la_layout_version;
                osi->osi_setattr.lsr_projid = attr->la_projid;
                osi->osi_setattr.lsr_valid =
                        ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
                        ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
                        ((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
+               if (attr->la_valid & LA_LAYOUT_VERSION)
+                       osi->osi_setattr.lsr_valid |= OBD_MD_LAYOUT_VERSION;
                break;
        default:
                LBUG();
@@ -745,7 +748,7 @@ static int osp_sync_new_setattr_job(struct osp_device *d,
        /* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
         * so no bits other than these should be set. */
        if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
-           OBD_MD_FLPROJID)) != 0) {
+           OBD_MD_FLPROJID | OBD_MD_LAYOUT_VERSION)) != 0) {
                CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
                        d->opd_obd->obd_name, rec->lsr_valid);
                /* return 1 on invalid record */
@@ -762,9 +765,11 @@ static int osp_sync_new_setattr_job(struct osp_device *d,
        body->oa.o_uid = rec->lsr_uid;
        body->oa.o_gid = rec->lsr_gid;
        body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
-       if (h->lrh_len > sizeof(struct llog_setattr64_rec))
-               body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
-                                     rec)->lsr_projid;
+       if (h->lrh_len > sizeof(struct llog_setattr64_rec)) {
+               struct llog_setattr64_rec_v2 *rec_v2 = (typeof(rec_v2))rec;
+               body->oa.o_projid = rec_v2->lsr_projid;
+               body->oa.o_layout_version = rec_v2->lsr_layout_version;
+       }
 
        /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
         * we assume that both UID and GID are valid in that case. */
@@ -773,6 +778,12 @@ static int osp_sync_new_setattr_job(struct osp_device *d,
        else
                body->oa.o_valid |= rec->lsr_valid;
 
+       if (body->oa.o_valid & OBD_MD_LAYOUT_VERSION) {
+               OBD_FAIL_TIMEOUT(OBD_FAIL_FLR_LV_DELAY, cfs_fail_val);
+               if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_FLR_LV_INC)))
+                       ++body->oa.o_layout_version;
+       }
+
        osp_sync_send_new_rpc(d, llh, h, req);
        RETURN(0);
 }
index 2f07e36..50b2d6b 100644 (file)
@@ -1728,7 +1728,7 @@ void lustre_swab_obdo (struct obdo  *o)
        __swab32s(&o->o_stripe_idx);
        __swab32s(&o->o_parent_ver);
        lustre_swab_ost_layout(&o->o_layout);
-       CLASSERT(offsetof(typeof(*o), o_padding_3) != 0);
+       __swab32s(&o->o_layout_version);
        __swab32s(&o->o_uid_h);
        __swab32s(&o->o_gid_h);
        __swab64s(&o->o_data_version);
index ec8b4d8..22c0912 100644 (file)
@@ -1434,10 +1434,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct obdo, o_layout));
        LASSERTF((int)sizeof(((struct obdo *)0)->o_layout) == 28, "found %lld\n",
                 (long long)(int)sizeof(((struct obdo *)0)->o_layout));
-       LASSERTF((int)offsetof(struct obdo, o_padding_3) == 164, "found %lld\n",
-                (long long)(int)offsetof(struct obdo, o_padding_3));
-       LASSERTF((int)sizeof(((struct obdo *)0)->o_padding_3) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct obdo *)0)->o_padding_3));
+       LASSERTF((int)offsetof(struct obdo, o_layout_version) == 164, "found %lld\n",
+                (long long)(int)offsetof(struct obdo, o_layout_version));
+       LASSERTF((int)sizeof(((struct obdo *)0)->o_layout_version) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct obdo *)0)->o_layout_version));
        LASSERTF((int)offsetof(struct obdo, o_uid_h) == 168, "found %lld\n",
                 (long long)(int)offsetof(struct obdo, o_uid_h));
        LASSERTF((int)sizeof(((struct obdo *)0)->o_uid_h) == 4, "found %lld\n",
index a2e0c10..b5fbf88 100644 (file)
@@ -640,7 +640,7 @@ check_obdo(void)
        CHECK_MEMBER(obdo, o_parent_ver);
        CHECK_MEMBER(obdo, o_handle);
        CHECK_MEMBER(obdo, o_layout);
-       CHECK_MEMBER(obdo, o_padding_3);
+       CHECK_MEMBER(obdo, o_layout_version);
        CHECK_MEMBER(obdo, o_uid_h);
        CHECK_MEMBER(obdo, o_gid_h);
        CHECK_MEMBER(obdo, o_data_version);
index 0cbe43c..c0bb3fd 100644 (file)
@@ -1453,10 +1453,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct obdo, o_layout));
        LASSERTF((int)sizeof(((struct obdo *)0)->o_layout) == 28, "found %lld\n",
                 (long long)(int)sizeof(((struct obdo *)0)->o_layout));
-       LASSERTF((int)offsetof(struct obdo, o_padding_3) == 164, "found %lld\n",
-                (long long)(int)offsetof(struct obdo, o_padding_3));
-       LASSERTF((int)sizeof(((struct obdo *)0)->o_padding_3) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct obdo *)0)->o_padding_3));
+       LASSERTF((int)offsetof(struct obdo, o_layout_version) == 164, "found %lld\n",
+                (long long)(int)offsetof(struct obdo, o_layout_version));
+       LASSERTF((int)sizeof(((struct obdo *)0)->o_layout_version) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct obdo *)0)->o_layout_version));
        LASSERTF((int)offsetof(struct obdo, o_uid_h) == 168, "found %lld\n",
                 (long long)(int)offsetof(struct obdo, o_uid_h));
        LASSERTF((int)sizeof(((struct obdo *)0)->o_uid_h) == 4, "found %lld\n",