Whamcloud - gitweb
b=19964 SOM EA
authorVitaly Fertman <Vitaly.Fertman@Sun.COM>
Tue, 22 Dec 2009 01:28:51 +0000 (04:28 +0300)
committerRobert Read <rread@sun.com>
Tue, 22 Dec 2009 01:34:02 +0000 (17:34 -0800)
SOM stores proper size, blocks, IOEpoch and mountid in EA

lustre/include/lustre/lustre_idl.h
lustre/include/md_object.h
lustre/llite/llite_close.c
lustre/mdc/mdc_lib.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/tests/sanity.sh

index ef7c346..e33d49f 100644 (file)
@@ -289,10 +289,16 @@ struct lustre_mdt_attrs {
         __u32   lma_incompat;
         /** FID of this inode */
         struct lu_fid  lma_self_fid;
-        /** SOM state, mdt/ost type, others */
+        /** mdt/ost type, others */
         __u64   lma_flags;
-        /** total sectors in objects */
-        __u64   lma_som_sectors;
+        /* IO Epoch SOM attributes belongs to */
+        __u64   lma_ioepoch;
+        /** total file size in objects */
+        __u64   lma_som_size;
+        /** total fs blocks in objects */
+        __u64   lma_som_blocks;
+        /** mds mount id the size is valid for */
+        __u64   lma_som_mountid;
 };
 
 /**
@@ -306,13 +312,16 @@ static inline void lustre_lma_init(struct lustre_mdt_attrs *lma,
         lma->lma_incompat    = 0;
         memcpy(&lma->lma_self_fid, fid, sizeof(*fid));
         lma->lma_flags       = 0;
-        lma->lma_som_sectors = 0;
+        lma->lma_ioepoch     = 0;
+        lma->lma_som_size    = 0;
+        lma->lma_som_blocks  = 0;
+        lma->lma_som_mountid = 0;
 
         /* If a field is added in struct lustre_mdt_attrs, zero it explicitly
          * and change the test below. */
         LASSERT(sizeof(*lma) ==
-                (offsetof(struct lustre_mdt_attrs, lma_som_sectors) +
-                 sizeof(lma->lma_som_sectors)));
+                (offsetof(struct lustre_mdt_attrs, lma_som_mountid) +
+                 sizeof(lma->lma_som_mountid)));
 };
 
 extern void lustre_swab_lu_fid(struct lu_fid *fid);
@@ -329,7 +338,10 @@ static inline void lustre_lma_swab(struct lustre_mdt_attrs *lma)
                 __swab32s(&lma->lma_incompat);
                 lustre_swab_lu_fid(&lma->lma_self_fid);
                 __swab64s(&lma->lma_flags);
-                __swab64s(&lma->lma_som_sectors);
+                __swab64s(&lma->lma_ioepoch);
+                __swab64s(&lma->lma_som_size);
+                __swab64s(&lma->lma_som_blocks);
+                __swab64s(&lma->lma_som_mountid);
         }
 };
 
@@ -1237,8 +1249,7 @@ enum md_op_flags {
         MF_MDC_CANCEL_FID4      = (1 << 6),
 };
 
-#define MF_SOM_LOCAL_FLAGS (MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID2 | \
-                            MF_MDC_CANCEL_FID3 | MF_MDC_CANCEL_FID4)
+#define MF_SOM_LOCAL_FLAGS (MF_SOM_CHANGE | MF_EPOCH_OPEN | MF_EPOCH_CLOSE)
 
 #define MDS_BFLAG_UNCOMMITTED_WRITES   0x1
 #define MDS_BFLAG_EXT_FLAGS     0x80000000 /* == EXT3_RESERVED_FL */
index 0e3dd26..5afe871 100644 (file)
@@ -151,6 +151,16 @@ struct md_hsm {
         __u32  mh_flags;
 };
 #define ma_hsm_flags ma_hsm.mh_flags
+#define HSM_FLAGS_MASK 0
+
+#define IOEPOCH_INVAL 0
+
+struct md_som_data {
+        __u64 msd_ioepoch;
+        __u64 msd_size;
+        __u64 msd_blocks;
+        __u64 msd_mountid;
+};
 
 struct md_attr {
         __u64                   ma_valid;
@@ -167,7 +177,7 @@ struct md_attr {
         int                     ma_cookie_size;
         struct lustre_capa     *ma_capa;
         struct md_hsm           ma_hsm;
-/* XXX:  struct md_som_data   *ma_som; */
+        struct md_som_data     *ma_som;
 };
 
 /** Additional parameters for create */
index 803f478..3fdf87a 100644 (file)
@@ -226,6 +226,7 @@ int ll_som_update(struct inode *inode, struct md_op_data *op_data)
         int rc;
         ENTRY;
 
+        LASSERT(op_data != NULL);
         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
                 CERROR("ino %lu/%u(flags %lu) som valid it just after "
                        "recovery\n", inode->i_ino, inode->i_generation,
index 83aa420..1bab2b6 100644 (file)
@@ -150,7 +150,7 @@ void mdc_create_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
         rec->cr_time     = op_data->op_mod_time;
         rec->cr_suppgid1 = op_data->op_suppgids[0];
         rec->cr_suppgid2 = op_data->op_suppgids[1];
-        rec->cr_flags    = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS;
+        rec->cr_flags    = op_data->op_flags & MF_SOM_LOCAL_FLAGS;
         rec->cr_bias     = op_data->op_bias;
 
         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
@@ -312,7 +312,7 @@ static void mdc_ioepoch_pack(struct mdt_ioepoch *epoch,
 {
         memcpy(&epoch->handle, &op_data->op_handle, sizeof(epoch->handle));
         epoch->ioepoch = op_data->op_ioepoch;
-        epoch->flags = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS;
+        epoch->flags = op_data->op_flags & MF_SOM_LOCAL_FLAGS;
 }
 
 void mdc_setattr_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
index ddb26db..1967031 100644 (file)
@@ -722,12 +722,14 @@ static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
                         ma->ma_hsm_flags = 0;
                 ma->ma_valid |= MA_HSM;
         }
-        if (ma->ma_need & MA_SOM) {
-
-                /* XXX: Here, copy and swab SoM data, and then remove this
-                 * assert. */
-                LASSERT(!(ma->ma_need & MA_SOM));
 
+        /* Copy SOM */
+        if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
+                LASSERT(ma->ma_som != NULL);
+                ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
+                ma->ma_som->msd_size    = lma->lma_som_size;
+                ma->ma_som->msd_blocks  = lma->lma_som_blocks;
+                ma->ma_som->msd_mountid = lma->lma_som_mountid;
                 ma->ma_valid |= MA_SOM;
         }
 
@@ -1019,7 +1021,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                         struct lu_attr *la, const struct md_attr *ma)
 {
         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
-        struct md_ucred  *uc         = md_ucred(env);
+        struct md_ucred  *uc;
         int               rc;
         ENTRY;
 
@@ -1034,6 +1036,13 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
                 RETURN(-EPERM);
 
+        /* export destroy does not have ->le_ses, but we may want
+         * to drop LUSTRE_SOM_FL. */
+        if (!env->le_ses)
+                RETURN(0);
+
+        uc = md_ucred(env);
+
         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         if (rc)
                 RETURN(rc);
@@ -1287,15 +1296,15 @@ static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
 
         ENTRY;
 
-        memset(lma, 0, lmasize);
-
         /* Either HSM or SOM part is not valid, we need to read it before */
         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
-                if (rc)
+                if (rc <= 0)
                         RETURN(rc);
 
                 lustre_lma_swab(lma);
+        } else {
+                memset(lma, 0, lmasize);
         }
 
         /* Copy HSM data */
@@ -1303,12 +1312,19 @@ static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
                 lma->lma_flags  |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
                 lma->lma_compat |= LMAC_HSM;
         }
-        /* XXX: Copy SOM data */
+
+        /* Copy SOM data */
         if (ma->ma_valid & MA_SOM) {
-                /*
-                lma->lma_compat |= LMAC_SOM;
-                */
-                LASSERT(!(ma->ma_valid & MA_SOM));
+                LASSERT(ma->ma_som != NULL);
+                if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
+                        lma->lma_compat     &= ~LMAC_SOM;
+                } else {
+                        lma->lma_compat     |= LMAC_SOM;
+                        lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
+                        lma->lma_som_size    = ma->ma_som->msd_size;
+                        lma->lma_som_blocks  = ma->ma_som->msd_blocks;
+                        lma->lma_som_mountid = ma->ma_som->msd_mountid;
+                }
         }
 
         /* Copy FID */
index 417af35..6945820 100644 (file)
@@ -368,20 +368,31 @@ static int mdt_statfs(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
-void mdt_pack_size2body(struct mdt_thread_info *info, struct mdt_object *o)
+/**
+ * Pack SOM attributes into the reply.
+ * Call under a DLM UPDATE lock.
+ */
+static void mdt_pack_size2body(struct mdt_thread_info *info,
+                               struct mdt_object *mo)
 {
         struct mdt_body *b;
-        struct lu_attr *attr = &info->mti_attr.ma_attr;
+        struct md_attr *ma = &info->mti_attr;
 
+        LASSERT(ma->ma_attr.la_valid & LA_MODE);
         b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-        /* Check if Size-on-MDS is enabled. */
-        if ((mdt_conn_flags(info) & OBD_CONNECT_SOM) &&
-            S_ISREG(attr->la_mode) && mdt_object_is_som_enabled(o)) {
-                b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
-                b->size = attr->la_size;
-                b->blocks = attr->la_blocks;
-        }
+        /* Check if Size-on-MDS is supported, if this is a regular file,
+         * if SOM is enabled on the object and if SOM cache exists and valid.
+         * Otherwise do not pack Size-on-MDS attributes to the reply. */
+        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
+            !S_ISREG(ma->ma_attr.la_mode) ||
+            !mdt_object_is_som_enabled(mo) ||
+            !(ma->ma_valid & MA_SOM))
+                return;
+
+        b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+        b->size = ma->ma_som->msd_size;
+        b->blocks = ma->ma_som->msd_blocks;
 }
 
 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
@@ -433,7 +444,7 @@ static inline int mdt_body_has_lov(const struct lu_attr *la,
 }
 
 static int mdt_getattr_internal(struct mdt_thread_info *info,
-                                struct mdt_object *o)
+                                struct mdt_object *o, int ma_need)
 {
         struct md_object        *next = mdt_object_child(o);
         const struct mdt_body   *reqbody = info->mti_body;
@@ -484,6 +495,10 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                 /* get default stripe info for this dir. */
                 ma->ma_need |= MA_LOV_DEF;
         }
+        ma->ma_need |= ma_need;
+        if (ma->ma_need & MA_SOM)
+                ma->ma_som = &info->mti_u.som.data;
+
         rc = mo_attr_get(env, next, ma);
         if (unlikely(rc)) {
                 CERROR("getattr error for "DFID": %d\n",
@@ -693,7 +708,7 @@ static int mdt_getattr(struct mdt_thread_info *info)
          * remote obj, and at that time no capability is available.
          */
         mdt_set_capainfo(info, 1, &reqbody->fid1, BYPASS_CAPA);
-        rc = mdt_getattr_internal(info, obj);
+        rc = mdt_getattr_internal(info, obj, 0);
         if (reqbody->valid & OBD_MD_FLRMTPERM)
                 mdt_exit_ucred(info);
         EXIT;
@@ -792,6 +807,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         struct ldlm_lock       *lock;
         struct ldlm_res_id     *res_id;
         int                     is_resent;
+        int                     ma_need = 0;
         int                     rc;
 
         ENTRY;
@@ -886,7 +902,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         /* Finally, we can get attr for child. */
                         mdt_set_capainfo(info, 0, mdt_object_fid(child),
                                          BYPASS_CAPA);
-                        rc = mdt_getattr_internal(info, child);
+                        rc = mdt_getattr_internal(info, child, 0);
                         if (unlikely(rc != 0))
                                 mdt_object_unlock(info, child, lhc, 1);
                 }
@@ -991,38 +1007,34 @@ relock:
                         GOTO(out_child, rc);
         }
 
+        lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+        /* Get MA_SOM attributes if update lock is given. */
+        if (lock &&
+            lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE &&
+            S_ISREG(lu_object_attr(&mdt_object_child(child)->mo_lu)))
+                ma_need = MA_SOM;
+
         /* finally, we can get attr for child. */
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
-        rc = mdt_getattr_internal(info, child);
+        rc = mdt_getattr_internal(info, child, ma_need);
         if (unlikely(rc != 0)) {
                 mdt_object_unlock(info, child, lhc, 1);
-        } else {
-                lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-                if (lock) {
-                        struct mdt_body *repbody;
+        } else if (lock) {
+                /* Debugging code. */
+                res_id = &lock->l_resource->lr_name;
+                LDLM_DEBUG(lock, "Returning lock to client");
+                LASSERTF(fid_res_name_eq(mdt_object_fid(child),
+                                         &lock->l_resource->lr_name),
+                         "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
+                         (unsigned long)res_id->name[0],
+                         (unsigned long)res_id->name[1],
+                         (unsigned long)res_id->name[2],
+                         PFID(mdt_object_fid(child)));
+                mdt_pack_size2body(info, child);
+        }
+        if (lock)
+                LDLM_LOCK_PUT(lock);
 
-                        /* Debugging code. */
-                        res_id = &lock->l_resource->lr_name;
-                        LDLM_DEBUG(lock, "Returning lock to client");
-                        LASSERTF(fid_res_name_eq(mdt_object_fid(child),
-                                                 &lock->l_resource->lr_name),
-                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
-                                 (unsigned long)res_id->name[0],
-                                 (unsigned long)res_id->name[1],
-                                 (unsigned long)res_id->name[2],
-                                 PFID(mdt_object_fid(child)));
-                        /*
-                         * Pack Size-on-MDS inode attributes to the body if
-                         * update lock is given.
-                         */
-                        repbody = req_capsule_server_get(info->mti_pill,
-                                                         &RMF_MDT_BODY);
-                        if (lock->l_policy_data.l_inodebits.bits &
-                            MDS_INODELOCK_UPDATE)
-                                mdt_pack_size2body(info, child);
-                        LDLM_LOCK_PUT(lock);
-                }
-        }
         EXIT;
 out_child:
         mdt_object_put(info->mti_env, child);
@@ -4790,6 +4802,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                 lu_object_init(o, h, d);
                 lu_object_add_top(h, o);
                 o->lo_ops = &mdt_obj_ops;
+                sema_init(&mo->mot_ioepoch_sem, 1);
                 RETURN(o);
         } else
                 RETURN(NULL);
index bc14e74..d4dfbca 100644 (file)
@@ -125,7 +125,7 @@ struct mdt_device {
         /* mdt state flags */
         __u32                      mdt_fl_cfglog:1,
                                    mdt_fl_synced:1;
-        /* lock to pretect epoch and write count */
+        /* lock to protect IOepoch */
         spinlock_t                 mdt_ioepoch_lock;
         __u64                      mdt_ioepoch;
 
@@ -189,6 +189,26 @@ struct mdt_object {
         __u64                   mot_flags;
         int                     mot_ioepoch_count;
         int                     mot_writecount;
+        /* Lock to protect object's IO epoch. */
+        struct semaphore        mot_ioepoch_sem;
+};
+
+enum mdt_object_flags {
+        /** SOM attributes are changed. */
+        MOF_SOM_CHANGE  = (1 << 0),
+        /**
+         * The SOM recovery state for mdt object.
+         * This state is an in-memory equivalent of an absent SOM EA, used
+         * instead of invalidating SOM EA while IOEpoch is still opened when
+         * a client eviction occurs or a client fails to obtain SOM attributes.
+         * It indicates that the last IOEpoch holder will need to obtain SOM
+         * attributes under [0;EOF] extent lock to flush all the client's
+         * cached of evicted from MDS clients (but not necessary evicted from
+         * OST) before taking ost attributes.
+         */
+        MOF_SOM_RECOV   = (1 << 1),
+        /** File has been just created. */
+        MOF_SOM_CREATED = (1 << 2),
 };
 
 struct mdt_lock_handle {
@@ -355,6 +375,10 @@ struct mdt_thread_info {
                         /* for mdt_sendpage()      */
                         struct l_wait_info mti_wait_info;
                 } rdpg;
+                struct {
+                        struct md_attr attr;
+                        struct md_som_data data;
+                } som;
         } mti_u;
 
         /* IO epoch related stuff. */
@@ -525,8 +549,6 @@ void mdt_object_unlock_put(struct mdt_thread_info *,
 int mdt_close_unpack(struct mdt_thread_info *info);
 int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
 int mdt_reint_rec(struct mdt_thread_info *, struct mdt_lock_handle *);
-void mdt_pack_size2body(struct mdt_thread_info *info,
-                        struct mdt_object *o);
 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                         const struct lu_attr *attr, const struct lu_fid *fid);
 
@@ -574,17 +596,30 @@ int mdt_reint_open(struct mdt_thread_info *info,
 
 struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *,
                                      const struct lustre_handle *);
-int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o);
-void mdt_object_som_enable(struct mdt_thread_info *info, struct mdt_object *mo);
+
+enum {
+        MDT_IOEPOCH_CLOSED  = 0,
+        MDT_IOEPOCH_OPENED  = 1,
+        MDT_IOEPOCH_GETATTR = 2,
+};
+
+enum {
+        MDT_SOM_DISABLE = 0,
+        MDT_SOM_ENABLE  = 1,
+};
+
+int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
+                     int created);
 int mdt_object_is_som_enabled(struct mdt_object *mo);
-int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o);
-int mdt_write_read(struct mdt_device *mdt, struct mdt_object *o);
+int mdt_write_get(struct mdt_object *o);
+void mdt_write_put(struct mdt_object *o);
+int mdt_write_read(struct mdt_object *o);
 struct mdt_file_data *mdt_mfd_new(void);
 int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd);
 void mdt_mfd_free(struct mdt_file_data *mfd);
 int mdt_close(struct mdt_thread_info *info);
 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
-                 int flags);
+                 struct md_attr *ma, int flags);
 int mdt_done_writing(struct mdt_thread_info *info);
 void mdt_shrink_reply(struct mdt_thread_info *info);
 int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *,
index 19ca96e..741e19b 100644 (file)
@@ -133,22 +133,25 @@ int mdt_object_is_som_enabled(struct mdt_object *mo)
         return !mo->mot_ioepoch;
 }
 
-/* Re-enable Size-on-MDS. */
-void mdt_object_som_enable(struct mdt_thread_info *info,
-                           struct mdt_object *mo)
+/**
+ * Re-enable Size-on-MDS.
+ * Call under ->mot_ioepoch_sem.
+ */
+static void mdt_object_som_enable(struct mdt_object *mo, __u64 ioepoch)
 {
-       spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
-       if (info->mti_ioepoch->ioepoch == mo->mot_ioepoch) {
+        if (ioepoch == mo->mot_ioepoch) {
                 LASSERT(!mdt_ioepoch_opened(mo));
                 mo->mot_ioepoch = 0;
                 mo->mot_flags = 0;
-       }
-       spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
+        }
 }
 
-/* Open the epoch. Epoch open is allowed if @writecount is not negative.
- * The epoch and writecount handling is performed under the mdt_ioepoch_lock. */
-int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o)
+/**
+ * Open the IOEpoch. It is allowed if @writecount is not negative.
+ * The epoch and writecount handling is performed under the mot_ioepoch_sem.
+ */
+int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
+                     int created)
 {
         struct mdt_device *mdt = info->mti_mdt;
         int cancel = 0;
@@ -159,27 +162,37 @@ int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o)
             !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
                 RETURN(0);
 
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        down(&o->mot_ioepoch_sem);
         if (mdt_ioepoch_opened(o)) {
                 /* Epoch continues even if there is no writers yet. */
                 CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
                        o->mot_ioepoch, PFID(mdt_object_fid(o)));
         } else {
-                if (info->mti_replayepoch > mdt->mdt_ioepoch)
+                /* XXX: ->mdt_ioepoch is not initialized at the mount */
+                spin_lock(&mdt->mdt_ioepoch_lock);
+                if (mdt->mdt_ioepoch < info->mti_replayepoch)
                         mdt->mdt_ioepoch = info->mti_replayepoch;
+
+                if (info->mti_replayepoch)
+                        o->mot_ioepoch = info->mti_replayepoch;
+                else if (++mdt->mdt_ioepoch == IOEPOCH_INVAL)
+                        o->mot_ioepoch = ++mdt->mdt_ioepoch;
                 else
-                        mdt->mdt_ioepoch++;
-                o->mot_ioepoch = info->mti_replayepoch ?
-                        info->mti_replayepoch : mdt->mdt_ioepoch;
+                        o->mot_ioepoch = mdt->mdt_ioepoch;
+
+                spin_unlock(&mdt->mdt_ioepoch_lock);
+
                 CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
-                       mdt->mdt_ioepoch, PFID(mdt_object_fid(o)));
+                       o->mot_ioepoch, PFID(mdt_object_fid(o)));
+                if (created)
+                        o->mot_flags |= MOF_SOM_CREATED;
                 cancel = 1;
         }
         o->mot_ioepoch_count++;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        up(&o->mot_ioepoch_sem);
 
-        /* Cancel Size-on-MDS attributes on clients if not truncate.
-         * In the later case, mdt_reint_setattr will do it. */
+        /* Cancel Size-on-MDS attributes cached on clients for the open case.
+         * In the truncate case, see mdt_reint_setattr(). */
         if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
                 struct mdt_lock_handle  *lh = &info->mti_lh[MDT_LH_CHILD];
                 mdt_lock_reg_init(lh, LCK_EX);
@@ -191,148 +204,325 @@ int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o)
         RETURN(rc);
 }
 
-/* Update the on-disk attributes if needed and re-enable Size-on-MDS caching. */
-static int mdt_sizeonmds_update(struct mdt_thread_info *info,
-                                struct mdt_object *o)
+/**
+ * Update SOM on-disk attributes.
+ * If enabling, write update inodes and lustre-ea with the proper IOEpoch,
+ * mountid and attributes. If disabling, zero IOEpoch id in lustre-ea.
+ * Call under ->mot_ioepoch_sem.
+ */
+static int mdt_som_attr_set(struct mdt_thread_info *info,
+                            struct mdt_object *obj, __u64 ioepoch, int enable)
 {
+        struct md_attr *ma = &info->mti_attr;
+        int rc;
         ENTRY;
 
-        CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
+        CDEBUG(D_INODE, "Size-on-MDS attribute %s for epoch "LPU64
+               " on "DFID".\n", enable ? "update" : "disabling",
+               ioepoch, PFID(mdt_object_fid(obj)));
+
+        ma->ma_valid |= MA_SOM;
+        ma->ma_som = &info->mti_u.som.data;
+        if (enable) {
+                struct mdt_device *mdt = info->mti_mdt;
+                struct lu_attr *la = &ma->ma_attr;
+
+                ma->ma_som->msd_ioepoch = ioepoch;
+                ma->ma_som->msd_size = la->la_valid & LA_SIZE ? la->la_size : 0;
+                ma->ma_som->msd_blocks = la->la_valid & LA_BLOCKS ?
+                                         la->la_blocks : 0;
+                ma->ma_som->msd_mountid = mdt->mdt_mount_count;
+                ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
+        } else {
+                ma->ma_som->msd_ioepoch = IOEPOCH_INVAL;
+                ma->ma_attr.la_valid &= LA_ATIME;
+        }
+
+        /* Since we have opened the file, it is unnecessary
+         * to check permission when close it. Between the "open"
+         * and "close", maybe someone has changed the file mode
+         * or flags, or the file created mode do not permit wirte,
+         * and so on. Just set MDS_PERM_BYPASS for all the cases. */
+        ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
+
+        rc = mdt_attr_set(info, obj, ma, 0);
+        RETURN(rc);
+}
+
+/** Perform the eviction specific actions on ioepoch close. */
+static inline int mdt_ioepoch_close_on_eviction(struct mdt_thread_info *info,
+                                                struct mdt_object *o)
+{
+        int rc = 0;
+
+        down(&o->mot_ioepoch_sem);
+        CDEBUG(D_INODE, "Eviction. Closing IOepoch "LPU64" on "DFID". "
+               "Count %d\n", o->mot_ioepoch, PFID(mdt_object_fid(o)),
+               o->mot_ioepoch_count);
+        o->mot_ioepoch_count--;
+
+        /* If eviction occured set MOF_SOM_RECOV,
+         * if no other epoch holders, disable SOM on disk. */
+        o->mot_flags |= MOF_SOM_CHANGE | MOF_SOM_RECOV;
+        if (!mdt_ioepoch_opened(o)) {
+                rc = mdt_som_attr_set(info, o, o->mot_ioepoch, MDT_SOM_DISABLE);
+                mdt_object_som_enable(o, o->mot_ioepoch);
+        }
+        up(&o->mot_ioepoch_sem);
+        RETURN(rc);
+}
+
+/**
+ * Perform the replay specific actions on ioepoch close.
+ * Skip SOM attribute update if obtained and just forget about the inode state
+ * for the last ioepoch holder. The SOM cache is invalidated on MDS failure.
+ */
+static inline int mdt_ioepoch_close_on_replay(struct mdt_thread_info *info,
+                                              struct mdt_object *o)
+{
+        ENTRY;
+
+        down(&o->mot_ioepoch_sem);
+        CDEBUG(D_INODE, "Replay. Closing epoch "LPU64" on "DFID". Count %d\n",
                o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_ioepoch_count);
+        o->mot_ioepoch_count--;
+
+        if (!mdt_ioepoch_opened(o))
+                mdt_object_som_enable(o, info->mti_ioepoch->ioepoch);
+        up(&o->mot_ioepoch_sem);
 
-        if (info->mti_attr.ma_attr.la_valid & LA_SIZE) {
-                /* Do Size-on-MDS attribute update.
-                 * Size-on-MDS is re-enabled inside. */
-                /* XXX: since we have opened the file, it is unnecessary
-                 * to check permission when close it. Between the "open"
-                 * and "close", maybe someone has changed the file mode
-                 * or flags, or the file created mode do not permit wirte,
-                 * and so on. Just set MDS_PERM_BYPASS for all the cases. */
-                info->mti_attr.ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
-                info->mti_attr.ma_attr.la_valid &= LA_SIZE | LA_BLOCKS |
-                                                LA_ATIME | LA_MTIME | LA_CTIME;
-                RETURN(mdt_attr_set(info, o, 0));
-        } else
-                mdt_object_som_enable(info, o);
         RETURN(0);
 }
 
-/* Epoch closes.
- * Returns 1 if epoch does not close.
- * Returns 0 if epoch closes.
- * Returns -EAGAIN if epoch closes but an Size-on-MDS Update is still needed
- * from the client. */
-static int mdt_ioepoch_close(struct mdt_thread_info *info, struct mdt_object *o)
+/**
+ * Regular file IOepoch close.
+ * Closes the ioepoch, checks the object state, apply obtained attributes and
+ * re-enable SOM on the object, if possible.
+ *
+ * Return value:
+ * MDT_IOEPOCH_CLOSED if ioepoch is closed.
+ * MDT_IOEPOCH_GETATTR if ioepoch is closed but another SOM update is needed.
+ */
+static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info,
+                                        struct mdt_object *o)
 {
-        int eviction = (mdt_info_req(info) == NULL ? 1 : 0);
-        struct lu_attr *la = &info->mti_attr.ma_attr;
-        int achange = 0;
-        int opened;
-        int rc = 1;
+        struct md_attr *tmp_ma;
+        struct lu_attr *la;
+        int achange, opened;
+        int recovery = 0;
+        int rc = 0, ret = MDT_IOEPOCH_CLOSED;
         ENTRY;
 
-        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
-            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
-                RETURN(0);
+        la = &info->mti_attr.ma_attr;
+        achange = (info->mti_ioepoch->flags & MF_SOM_CHANGE);
 
-        spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
+        down(&o->mot_ioepoch_sem);
+        o->mot_ioepoch_count--;
 
-        /* Epoch closes only if client tells about it or eviction occures. */
-        if (eviction || (info->mti_ioepoch->flags & MF_EPOCH_CLOSE)) {
-                LASSERT(o->mot_ioepoch_count);
-                o->mot_ioepoch_count--;
+        tmp_ma = &info->mti_u.som.attr;
+        tmp_ma->ma_lmm = info->mti_attr.ma_lmm;
+        tmp_ma->ma_lmm_size = info->mti_attr.ma_lmm_size;
+        tmp_ma->ma_som = &info->mti_u.som.data;
+        tmp_ma->ma_need = MA_INODE | MA_LOV | MA_SOM;
+        tmp_ma->ma_valid = 0;
+        rc = mo_attr_get(info->mti_env, mdt_object_child(o), tmp_ma);
+        if (rc)
+                GOTO(error_up, rc);
 
-                CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
-                       o->mot_ioepoch, PFID(mdt_object_fid(o)),
-                       o->mot_ioepoch_count);
+        /* Check the on-disk SOM state. */
+        if (o->mot_flags & MOF_SOM_RECOV)
+                recovery = 1;
+        else if (!(o->mot_flags & MOF_SOM_CREATED) &&
+                 !(tmp_ma->ma_valid & MA_SOM))
+                recovery = 1;
 
-                if (!eviction)
-                        achange = (info->mti_ioepoch->flags & MF_SOM_CHANGE);
+        CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
+               o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_ioepoch_count);
 
-                rc = 0;
-                if (!eviction && !mdt_ioepoch_opened(o)) {
-                        /* Epoch ends. Is an Size-on-MDS update needed? */
-                        if (o->mot_flags & MF_SOM_CHANGE) {
-                                /* Some previous writer changed the attribute.
-                                 * Do not believe to the current Size-on-MDS
-                                 * update, re-ask client. */
-                                rc = -EAGAIN;
-                        } else if (!(la->la_valid & LA_SIZE) && achange) {
-                                /* Attributes were changed by the last writer
-                                 * only but no Size-on-MDS update is received.*/
-                                rc = -EAGAIN;
-                        }
-                }
+        opened = mdt_ioepoch_opened(o);
+        /**
+         * If IOEpoch is not opened, check if a Size-on-MDS update is needed.
+         * Skip the check for file with no LOV  or for unlink files.
+         */
+        if (!opened && tmp_ma->ma_valid & MA_LOV &&
+            !(tmp_ma->ma_valid & MA_INODE && tmp_ma->ma_attr.la_nlink == 0)) {
+                if (recovery)
+                        /* If some previous writer was evicted, re-ask the
+                         * client for attributes. Even if attributes are
+                         * provided, we cannot believe in them.
+                         * Another use case is that there is no SOM cache on
+                         * disk -- first access with SOM or there was an MDS
+                         * failure. */
+                        ret = MDT_IOEPOCH_GETATTR;
+                else if (o->mot_flags & MOF_SOM_CHANGE)
+                        /* Some previous writer changed the attribute.
+                         * Do not believe to the current Size-on-MDS
+                         * update, re-ask client. */
+                        ret = MDT_IOEPOCH_GETATTR;
+                else if (!(la->la_valid & LA_SIZE) && achange)
+                        /* Attributes were changed by the last writer
+                         * only but no Size-on-MDS update is received.*/
+                        ret = MDT_IOEPOCH_GETATTR;
+        }
 
-                if (achange || eviction)
-                        o->mot_flags |= MF_SOM_CHANGE;
+        if (achange || ret == MDT_IOEPOCH_GETATTR)
+                o->mot_flags |= MOF_SOM_CHANGE;
+
+        /* If epoch ends and relable SOM attributes are obtained, update them.
+         * Create SOM ea for new files even if there is no attributes obtained
+         * (0-length file). */
+        if (ret == MDT_IOEPOCH_CLOSED && !opened) {
+                if (achange || o->mot_flags & MOF_SOM_CREATED) {
+                        LASSERT(achange || !(la->la_valid & LA_SIZE));
+                        rc = mdt_som_attr_set(info, o, o->mot_ioepoch,
+                                              MDT_SOM_ENABLE);
+                        /* Avoid the following setattrs of these attributes,
+                         * e.g. for atime update. */
+                        info->mti_attr.ma_valid = 0;
+                }
+                mdt_object_som_enable(o, o->mot_ioepoch);
         }
 
-        opened = mdt_ioepoch_opened(o);
-        spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
-
-        /* If eviction occurred, do nothing. */
-        if ((rc == 0) && !opened && !eviction) {
-                /* Epoch ends and wanted Size-on-MDS update is obtained. */
-                rc = mdt_sizeonmds_update(info, o);
-                /* Avoid the following setattrs of these attributes, e.g.
-                 * for atime update. */
-                info->mti_attr.ma_valid = 0;
+        EXIT;
+error_up:
+        up(&o->mot_ioepoch_sem);
+        return rc ? : ret;
+}
+
+/**
+ * Close IOEpoch (opened file or FMODE_EPOCH state). It happens if:
+ * - a client closes the IOEpoch;
+ * - a client eviction occured.
+ * Return values:
+ * MDT_IOEPOCH_OPENED if the client does not close IOEpoch.
+ * MDT_IOEPOCH_CLOSED if the client closes IOEpoch.
+ * MDT_IOEPOCH_GETATTR if the client closes IOEpoch but another SOM attribute
+ * update is needed.
+ */
+static int mdt_ioepoch_close(struct mdt_thread_info *info, struct mdt_object *o)
+{
+        struct ptlrpc_request *req = mdt_info_req(info);
+        ENTRY;
+
+        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
+            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
+                RETURN(0);
+
+        LASSERT(o->mot_ioepoch_count);
+        LASSERT(info->mti_ioepoch == NULL ||
+                info->mti_ioepoch->ioepoch == o->mot_ioepoch);
+
+        /* IOEpoch is closed only if client tells about it or eviction occures.
+         * In the replay case, always close the epoch. */
+        if (req == NULL)
+                RETURN(mdt_ioepoch_close_on_eviction(info, o));
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+                RETURN(mdt_ioepoch_close_on_replay(info, o));
+        if (info->mti_ioepoch->flags & MF_EPOCH_CLOSE)
+                RETURN(mdt_ioepoch_close_reg(info, o));
+        /* IO epoch is not closed. */
+        RETURN(MDT_IOEPOCH_OPENED);
+}
+
+/**
+ * Close FMODE_SOM state, when IOEpoch is already closed and we are waiting for
+ * attribute update. It happens if:
+ * - SOM Attribute Update is obtained;
+ * - the client failed to obtain it and informs MDS about it;
+ * - a client eviction occured.
+ * Apply obtained attributes for the 1st case, wipe out the on-disk SOM
+ * cache otherwise.
+ */
+int mdt_som_au_close(struct mdt_thread_info *info, struct mdt_object *o)
+{
+        struct ptlrpc_request *req = mdt_info_req(info);
+        __u64 ioepoch = 0;
+        int act = MDT_SOM_ENABLE;
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(!req || info->mti_ioepoch);
+        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
+            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
+                RETURN(0);
+
+        /* No size whereas MF_SOM_CHANGE is set means client failed to
+         * obtain ost attributes, drop the SOM cache on disk if so. */
+        if (!req ||
+            (info->mti_ioepoch &&
+             info->mti_ioepoch->flags & MF_SOM_CHANGE &&
+             !(info->mti_attr.ma_attr.la_valid & LA_SIZE)))
+                act = MDT_SOM_DISABLE;
+
+        down(&o->mot_ioepoch_sem);
+        /* Mark the object it is the recovery state if we failed to obtain
+         * SOM attributes. */
+        if (act == MDT_SOM_DISABLE)
+                o->mot_flags |= MOF_SOM_RECOV;
+
+        if (!mdt_ioepoch_opened(o)) {
+                ioepoch =  info->mti_ioepoch ?
+                        info->mti_ioepoch->ioepoch : o->mot_ioepoch;
+
+                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
+                        rc = mdt_som_attr_set(info, o, ioepoch, act);
+                mdt_object_som_enable(o, ioepoch);
         }
+        up(&o->mot_ioepoch_sem);
         RETURN(rc);
 }
 
-int mdt_write_read(struct mdt_device *mdt, struct mdt_object *o)
+int mdt_write_read(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        down(&o->mot_ioepoch_sem);
         rc = o->mot_writecount;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        up(&o->mot_ioepoch_sem);
         RETURN(rc);
 }
 
-int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o)
+int mdt_write_get(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        down(&o->mot_ioepoch_sem);
         if (o->mot_writecount < 0)
                 rc = -ETXTBSY;
         else
                 o->mot_writecount++;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        up(&o->mot_ioepoch_sem);
         RETURN(rc);
 }
 
-static void mdt_write_put(struct mdt_device *mdt, struct mdt_object *o)
+void mdt_write_put(struct mdt_object *o)
 {
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        down(&o->mot_ioepoch_sem);
         o->mot_writecount--;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        up(&o->mot_ioepoch_sem);
         EXIT;
 }
 
-static int mdt_write_deny(struct mdt_device *mdt, struct mdt_object *o)
+static int mdt_write_deny(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        down(&o->mot_ioepoch_sem);
         if (o->mot_writecount > 0)
                 rc = -ETXTBSY;
         else
                 o->mot_writecount--;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        up(&o->mot_ioepoch_sem);
         RETURN(rc);
 }
 
-static void mdt_write_allow(struct mdt_device *mdt, struct mdt_object *o)
+static void mdt_write_allow(struct mdt_object *o)
 {
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        down(&o->mot_ioepoch_sem);
         o->mot_writecount++;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        up(&o->mot_ioepoch_sem);
         EXIT;
 }
 
@@ -422,13 +612,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
         }
 
         if (flags & FMODE_WRITE) {
-                rc = mdt_write_get(info->mti_mdt, o);
+                rc = mdt_write_get(o);
                 if (rc == 0) {
-                        mdt_ioepoch_open(info, o);
+                        mdt_ioepoch_open(info, o, created);
                         repbody->ioepoch = o->mot_ioepoch;
                 }
         } else if (flags & MDS_FMODE_EXEC) {
-                rc = mdt_write_deny(info->mti_mdt, o);
+                rc = mdt_write_deny(o);
         }
         if (rc)
                 RETURN(rc);
@@ -1232,19 +1422,22 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
         struct mdt_object *o = mfd->mfd_object;
         struct md_object *next = mdt_object_child(o);
         struct md_attr *ma = &info->mti_attr;
-        int rc = 0, ret = 0;
+        int ret = MDT_IOEPOCH_CLOSED;
+        int rc = 0;
         int mode;
         ENTRY;
 
         mode = mfd->mfd_mode;
 
         if ((mode & FMODE_WRITE) || (mode & FMODE_TRUNC)) {
-                mdt_write_put(info->mti_mdt, o);
+                mdt_write_put(o);
                 ret = mdt_ioepoch_close(info, o);
         } else if (mode & MDS_FMODE_EXEC) {
-                mdt_write_allow(info->mti_mdt, o);
+                mdt_write_allow(o);
         } else if (mode & FMODE_EPOCH) {
                 ret = mdt_ioepoch_close(info, o);
+        } else if (mode & FMODE_SOM) {
+                ret = mdt_som_au_close(info, o);
         }
 
         /* Update atime on close only. */
@@ -1260,22 +1453,15 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
 
         if (!MFD_CLOSED(mode))
                 rc = mo_close(info->mti_env, next, ma);
-        else if (ret == -EAGAIN)
-                rc = mo_attr_get(info->mti_env, next, ma);
-
-        /* If the object is unlinked, do not try to re-enable SIZEONMDS */
-        if ((ret == -EAGAIN) && (ma->ma_valid & MA_INODE) &&
-            (ma->ma_attr.la_nlink == 0)) {
-                ret = 0;
-        }
 
-        if ((ret == -EAGAIN) || (ret == 1)) {
+        if (ret == MDT_IOEPOCH_GETATTR || ret == MDT_IOEPOCH_OPENED) {
                 struct mdt_export_data *med;
 
-                /* The epoch has not closed or Size-on-MDS update is needed.
+                /* The IOepoch is still opened or SOM update is needed.
                  * Put mfd back into the list. */
                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
-                mdt_mfd_set_mode(mfd, (ret == 1 ? FMODE_EPOCH : FMODE_SOM));
+                mdt_mfd_set_mode(mfd, ret == MDT_IOEPOCH_OPENED ?
+                                      FMODE_EPOCH : FMODE_SOM);
 
                 LASSERT(mdt_info_req(info));
                 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
@@ -1284,9 +1470,10 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
                 class_handle_hash_back(&mfd->mfd_handle);
                 spin_unlock(&med->med_open_lock);
 
-                if (ret == 1) {
+                if (ret == MDT_IOEPOCH_OPENED) {
                         ret = 0;
                 } else {
+                        ret = -EAGAIN;
                         CDEBUG(D_INODE, "Size-on-MDS attribute update is "
                                "needed on "DFID"\n", PFID(mdt_object_fid(o)));
                 }
@@ -1384,8 +1571,17 @@ int mdt_close(struct mdt_thread_info *info)
         RETURN(rc ? rc : ret);
 }
 
+/**
+ * DONE_WRITING rpc handler.
+ *
+ * As mfd is not kept after replayed CLOSE (see mdt_ioepoch_close_on_replay()),
+ * only those DONE_WRITING rpc will be replayed which really wrote smth on disk,
+ * and got a trasid. Waiting for such DONE_WRITING is not reliable, so just
+ * skip attributes and reconstruct the reply here.
+ */
 int mdt_done_writing(struct mdt_thread_info *info)
 {
+        struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_body         *repbody = NULL;
         struct mdt_export_data  *med;
         struct mdt_file_data    *mfd;
@@ -1407,7 +1603,7 @@ int mdt_done_writing(struct mdt_thread_info *info)
                 RETURN(err_serious(rc));
 
         if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
-                RETURN(lustre_msg_get_status(mdt_info_req(info)->rq_repmsg));
+                RETURN(lustre_msg_get_status(req->rq_repmsg));
 
         med = &info->mti_exp->exp_mdt_data;
         spin_lock(&med->med_open_lock);
@@ -1419,6 +1615,11 @@ int mdt_done_writing(struct mdt_thread_info *info)
                        PFID(info->mti_rr.rr_fid1),
                        info->mti_ioepoch->handle.cookie,
                        info->mti_ioepoch->ioepoch);
+                /* If this is a replay, reconstruct the transno. */
+                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                        mdt_empty_transno(info);
+                        RETURN(0);
+                }
                 RETURN(-ESTALE);
         }
 
@@ -1431,7 +1632,15 @@ int mdt_done_writing(struct mdt_thread_info *info)
         /* Set EPOCH CLOSE flag if not set by client. */
         info->mti_ioepoch->flags |= MF_EPOCH_CLOSE;
         info->mti_attr.ma_valid = 0;
+
+        info->mti_attr.ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
+        OBD_ALLOC(info->mti_attr.ma_lmm, info->mti_attr.ma_lmm_size);
+        if (info->mti_attr.ma_lmm == NULL)
+                RETURN(-ENOMEM);
+
         rc = mdt_mfd_close(info, mfd);
+
+        OBD_FREE(info->mti_attr.ma_lmm, info->mti_attr.ma_lmm_size);
         mdt_empty_transno(info);
         RETURN(rc);
 }
index 2e62360..dc99028 100644 (file)
@@ -274,11 +274,10 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
  *               the client holds a lock already.
  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
  * mdt_setattr_unpack()) flag to tell these cases apart. */
-int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
+int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
+                 struct md_attr *ma, int flags)
 {
-        struct md_attr          *ma = &info->mti_attr;
         struct mdt_lock_handle  *lh;
-        int som_update = 0;
         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
         int rc;
         ENTRY;
@@ -286,14 +285,6 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
         /* attr shouldn't be set on remote object */
         LASSERT(mdt_object_exists(mo) >= 0);
 
-        if (exp_connect_som(info->mti_exp) && info->mti_ioepoch)
-                som_update = (info->mti_ioepoch->flags & MF_SOM_CHANGE);
-
-        /* Try to avoid object_lock if another epoch has been started
-         * already. */
-        if (som_update && (info->mti_ioepoch->ioepoch != mo->mot_ioepoch))
-                RETURN(0);
-
         lh = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_reg_init(lh, LCK_PW);
 
@@ -307,12 +298,6 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
                         RETURN(rc);
         }
 
-        /* Setattrs are syncronized through dlm lock taken above. If another
-         * epoch started, its attributes may be already flushed on disk,
-         * skip setattr. */
-        if (som_update && (info->mti_ioepoch->ioepoch != mo->mot_ioepoch))
-                GOTO(out_unlock, rc = 0);
-
         if (mdt_object_exists(mo) == 0)
                 GOTO(out_unlock, rc = -ENOENT);
 
@@ -337,15 +322,6 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
         if (rc != 0)
                 GOTO(out_unlock, rc);
 
-        /* Re-enable SIZEONMDS. */
-        if (som_update) {
-                CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID" size "LPU64
-                       ". Count %d\n", mo->mot_ioepoch,
-                       PFID(mdt_object_fid(mo)), ma->ma_attr.la_size,
-                       mo->mot_ioepoch_count);
-                mdt_object_som_enable(info, mo);
-        }
-
         EXIT;
 out_unlock:
         mdt_object_unlock(info, mo, lh, rc);
@@ -363,7 +339,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         struct mdt_object       *mo;
         struct md_object        *next;
         struct mdt_body         *repbody;
-        int                      rc;
+        int                      som_au, rc;
         ENTRY;
 
         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
@@ -382,21 +358,23 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
                     (rr->rr_flags & MRF_SETATTR_LOCKED)) {
                         /* Check write access for the O_TRUNC case */
-                        if (mdt_write_read(info->mti_mdt, mo) < 0)
+                        if (mdt_write_read(mo) < 0)
                                 GOTO(out_put, rc = -ETXTBSY);
                 }
         } else if (info->mti_ioepoch &&
                    (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
-                /* Truncate case. */
-                rc = mdt_write_get(info->mti_mdt, mo);
+                /* Truncate case. IOEpoch is opened. */
+                rc = mdt_write_get(mo);
                 if (rc)
                         GOTO(out_put, rc);
 
                 mfd = mdt_mfd_new();
-                if (mfd == NULL)
+                if (mfd == NULL) {
+                        mdt_write_put(mo);
                         GOTO(out_put, rc = -ENOMEM);
+                }
 
-                mdt_ioepoch_open(info, mo);
+                mdt_ioepoch_open(info, mo, 0);
                 repbody->ioepoch = mo->mot_ioepoch;
 
                 mdt_object_get(info->mti_env, mo);
@@ -410,26 +388,21 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
         }
 
-        if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_SOM_CHANGE))
-                ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
-
-        rc = mdt_attr_set(info, mo, rr->rr_flags);
-        if (rc)
-                GOTO(out_put, rc);
-
-        if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_SOM_CHANGE)) {
+        som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
+        if (som_au) {
+                /* SOM Attribute update case. Find the proper mfd and update
+                 * SOM attributes on the proper object. */
                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
                 LASSERT(info->mti_ioepoch);
 
                 spin_lock(&med->med_open_lock);
-                /* Size-on-MDS Update. Find and free mfd. */
                 mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
                 if (mfd == NULL) {
                         spin_unlock(&med->med_open_lock);
-                        CDEBUG(D_INODE | D_ERROR, "no handle for file close: "
-                                        "fid = "DFID": cookie = "LPX64"\n",
-                                        PFID(info->mti_rr.rr_fid1),
-                                        info->mti_ioepoch->handle.cookie);
+                        CDEBUG(D_INODE, "no handle for file close: "
+                               "fid = "DFID": cookie = "LPX64"\n",
+                               PFID(info->mti_rr.rr_fid1),
+                               info->mti_ioepoch->handle.cookie);
                         GOTO(out_put, rc = -ESTALE);
                 }
                 LASSERT(mfd->mfd_mode == FMODE_SOM);
@@ -439,7 +412,19 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 list_del_init(&mfd->mfd_list);
                 spin_unlock(&med->med_open_lock);
 
+                /* Close the found mfd, update attributes. */
+                ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
+                OBD_ALLOC(ma->ma_lmm, ma->ma_lmm_size);
+                if (ma->ma_lmm == NULL)
+                        GOTO(out_put, rc = -ENOMEM);
+
                 mdt_mfd_close(info, mfd);
+
+                OBD_FREE(ma->ma_lmm, ma->ma_lmm_size);
+        } else {
+                rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
+                if (rc)
+                        GOTO(out_put, rc);
         }
 
         ma->ma_need = MA_INODE;
@@ -454,7 +439,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         if (info->mti_mdt->mdt_opts.mo_oss_capa &&
             info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
             S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
-            (ma->ma_attr.la_valid & LA_SIZE)) {
+            (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
index 3affb6f..79294e0 100644 (file)
@@ -5930,9 +5930,12 @@ test_132() { #1028, SOM
         stat $DIR/$tfile >/dev/null
         gl2=$(get_ost_param "ldlm_glimpse_enqueue")
         echo "====> SOM is "$som1", "$((gl2 - gl1))" glimpse RPC occured"
-        cancel_lru_locks osc
+        rm $DIR/$tfile
         som_mode_switch $som1 $gl1 $gl2
 
+        dd if=/dev/zero of=$DIR/$tfile count=1 2>/dev/null
+        cancel_lru_locks osc
+
         som2=$(do_facet $mymds "$LCTL get_param mdt.*.som" |  awk -F= ' {print $2}' | head -n 1)
         if [ $som1 == $som2 ]; then
             error "som is still "$som2