Whamcloud - gitweb
- additional debug to catch the case when client gets openhandle w/o capa
[fs/lustre-release.git] / lustre / mds / mds_open.c
index 89f7665..f00cac4 100644 (file)
@@ -43,6 +43,7 @@
 #include <linux/obd_lov.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/lprocfs_status.h>
+#include <linux/lustre_gs.h>
 
 #include "mds_internal.h"
 
@@ -86,14 +87,14 @@ struct mds_file_data *mds_mfd_new(void)
         return mfd;
 }
 
-static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
+struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
 {
         ENTRY;
         LASSERT(handle != NULL);
         RETURN(class_handle2object(handle->cookie));
 }
 
-static void mds_mfd_put(struct mds_file_data *mfd)
+void mds_mfd_put(struct mds_file_data *mfd)
 {
         CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
                atomic_read(&mfd->mfd_refcount) - 1);
@@ -167,13 +168,16 @@ static int mds_get_write_access(struct mds_obd *mds, struct inode *inode,
                 rc = -ENOMEM;
                 goto out;
         }
-        if (epoch > mds->mds_io_epoch)
+        if (epoch > mds->mds_io_epoch) {
                 mds->mds_io_epoch = epoch;
-        else
+                CDEBUG(D_INODE, "repair MDS epoch "LPU64" for ino %lu/%u\n",
+                       mds->mds_io_epoch, inode->i_ino, inode->i_generation);
+        } else {
                 mds->mds_io_epoch++;
+                CDEBUG(D_INODE, "starting MDS epoch "LPU64" for ino %lu/%u\n",
+                       mds->mds_io_epoch, inode->i_ino, inode->i_generation);
+        }
         MDS_FILTERDATA(inode)->io_epoch = mds->mds_io_epoch;
-        CDEBUG(D_INODE, "starting MDS epoch "LPU64" for ino %lu/%u\n",
-               mds->mds_io_epoch, inode->i_ino, inode->i_generation);
  out:
         if (rc == 0)
                 atomic_inc(&inode->i_writecount);
@@ -233,7 +237,8 @@ int mds_query_write_access(struct inode *inode)
 /* This replaces the VFS dentry_open, it manages mfd and writecount */
 static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
                                              struct vfsmount *mnt, int flags,
-                                             struct ptlrpc_request *req)
+                                             struct ptlrpc_request *req,
+                                             struct mds_update_record *rec)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct mds_obd *mds = mds_req2mds(req);
@@ -252,7 +257,7 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
 
         if (flags & FMODE_WRITE) {
                 /* FIXME: in recovery, need to pass old epoch here */
-                rc = mds_get_write_access(mds, dentry->d_inode, 0);
+                rc = mds_get_write_access(mds, dentry->d_inode, rec->ur_ioepoch);
                 if (rc)
                         GOTO(cleanup_mfd, rc);
                 body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch;
@@ -440,7 +445,11 @@ mds_create_objects(struct obd_device *obd, struct ptlrpc_request *req,
                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
                 
-                rc = obd_setattr(mds->mds_dt_exp, oa, lsm, &oti);
+                /* pack lustre id to oss */
+                *(obdo_id(oa)) = body->id1;
+                oa->o_valid |= OBD_MD_FLIFID;
+
+                rc = obd_setattr(mds->mds_dt_exp, oa, lsm, &oti, NULL);
                 if (rc) {
                         CERROR("error setting attrs for inode %lu: rc %d\n",
                                inode->i_ino, rc);
@@ -580,7 +589,13 @@ static void reconstruct_open(struct mds_update_record *rec, int offset,
                 return; /* error looking up parent or child */
         }
 
-        if (rec->ur_namelen == 1) {
+        /* first, we try to open the file by fid. by the time of this
+         * request, inode can be an orphan and parent can disappear */
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                CDEBUG(D_HA, "OPEN by fid "DLID4" (RESENT|REPLAY)\n",
+                       OLID4(rec->ur_id2));
+                dchild = mds_id2dentry(obd, rec->ur_id2, NULL);
+        } else if (rec->ur_namelen == 1) {
                 CDEBUG(D_HA, "OPEN by fid "DLID4" (RESENT)\n",
                        OLID4(rec->ur_id1));
                 dchild = mds_id2dentry(obd, rec->ur_id1, NULL);
@@ -617,6 +632,7 @@ static void reconstruct_open(struct mds_update_record *rec, int offset,
                 if (!(body->valid & OBD_MD_FLEASIZE))
                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
+                DEBUG_REQ(D_ERROR, req, "no capa for "DLID4, OLID4(&body->id1));
         }
         
         /* If we have -EEXIST as the status, and we were asked to create
@@ -656,7 +672,7 @@ static void reconstruct_open(struct mds_update_record *rec, int offset,
                 mntget(mds->mds_vfsmnt);
                 CERROR("Re-opened file \n");
                 mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
-                                      rec->ur_flags & ~MDS_OPEN_TRUNC, req);
+                                      rec->ur_flags & ~MDS_OPEN_TRUNC, req, rec);
                 if (!mfd) {
                         CERROR("mds: out of memory\n");
                         GOTO(out_dput, req->rq_status = -ENOMEM);
@@ -677,7 +693,7 @@ static void reconstruct_open(struct mds_update_record *rec, int offset,
 }
 
 /* do NOT or the MAY_*'s, you'll get the weakest */
-static int accmode(int flags)
+int accmode(int flags)
 {
         int res = 0;
 
@@ -697,10 +713,11 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_obd *mds = mds_req2mds(req);
+        struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
         struct mds_file_data *mfd = NULL;
         obd_id *ids = NULL;
         unsigned mode;
-        int rc = 0;
+        int rc = 0, reply_off;
         ENTRY;
 
         /* atomically create objects if necessary */
@@ -748,17 +765,47 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                         }
                 }
         }
-        
-        rc = mds_pack_acl(obd, req->rq_repmsg, 3, body, dchild->d_inode);
+
+        reply_off = 3;
+        rc = mds_pack_acl(req, reply_off, body, dchild->d_inode);
+        reply_off += 2;
+
         if (rc < 0) {
-                CERROR("mds_pack_acl: rc = %d\n", rc);
+                CERROR("pack posix acl: rc = %d\n", rc);
                 up(&dchild->d_inode->i_sem);
                 RETURN(rc);
         }
 
+        rc = mds_pack_gskey(obd, req->rq_repmsg, &reply_off, body, 
+                            dchild->d_inode); 
+        if (rc < 0) {
+                CERROR("mds_pack_gskey: rc = %d\n", rc);
+                up(&dchild->d_inode->i_sem);
+                RETURN(rc);
+        }
+
+        if (S_ISREG(mode)) {
+                struct lustre_capa capa = {
+                        .lc_uid   = rec->ur_uc.luc_uid,
+                        .lc_op    = capa_op(rec->ur_flags),
+                        .lc_ino   = dchild->d_inode->i_ino,
+                        .lc_igen  = dchild->d_inode->i_generation,
+                        .lc_mdsid = mds->mds_num,
+                };
+
+                rc = mds_pack_capa(obd, med, NULL, &capa, req,
+                                   &reply_off, body);
+                if (rc < 0) {
+                        CERROR("mds_pack_capa: rc = %d\n", rc);
+                        up(&dchild->d_inode->i_sem);
+                        RETURN(rc);
+                }
+        } else {
+                reply_off++;
+        }
+
         /* If the inode has no EA data, then MDSs hold size, mtime */
-        if (S_ISREG(dchild->d_inode->i_mode) &&
-            !(body->valid & OBD_MD_FLEASIZE)) {
+        if (S_ISREG(mode) && !(body->valid & OBD_MD_FLEASIZE)) {
                 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
                                 OBD_MD_FLATIME | OBD_MD_FLMTIME);
         }
@@ -766,7 +813,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
         up(&dchild->d_inode->i_sem);
 
         intent_set_disposition(rep, DISP_OPEN_OPEN);
-        mfd = mds_dentry_open(dchild, mds->mds_vfsmnt, flags, req);
+        mfd = mds_dentry_open(dchild, mds->mds_vfsmnt, flags, req, rec);
         if (IS_ERR(mfd))
                 RETURN(PTR_ERR(mfd));
 
@@ -821,10 +868,8 @@ static int mds_open_by_id(struct ptlrpc_request *req,
         }
         l_dput(dchild);
 
-        /*
-         * we didn't find it in PENDING so it isn't an orphan.  See if it was a
-         * regular inode that was previously created.
-         */
+        /* we didn't find it in PENDING so it isn't an orphan.  See if it was a
+         * regular inode that was previously created. */
         dchild = mds_id2dentry(req2obd(req), id, NULL);
         if (IS_ERR(dchild))
                 RETURN(PTR_ERR(dchild));
@@ -915,6 +960,8 @@ int mds_open(struct mds_update_record *rec, int offset,
         struct mea *mea = NULL;
         int mea_size, update_mode;
         int child_mode = LCK_PR;
+        struct lustre_id sid;
+        __u64 fid = 0;
         ENTRY;
 
         DEBUG_REQ(D_INODE, req, "parent "DLID4" name %*s mode %o",
@@ -1017,7 +1064,7 @@ restart:
         if (rc)
                 GOTO(cleanup, rc);
        
-        if (mea != NULL) {
+        if (mea != NULL && mea->mea_count) {
                 /*
                  * dir is already splitted, check is requested filename should *
                  * live at this MDS or at another one.
@@ -1115,6 +1162,14 @@ got_child:
                 struct iattr iattr;
                 struct inode *inode;
                 
+                /* get parent id: ldlm lock on the parent protects ea */
+                rc = mds_read_inode_sid(obd, dparent->d_inode, &sid);
+                if (rc) {
+                        CERROR("can't read parent inode id. ino(%lu) rc(%d)\n",
+                                dparent->d_inode->i_ino, rc);
+                        GOTO(cleanup, rc);
+                }
+
                 ino = (unsigned long)id_ino(rec->ur_id2);
                 
                 rc = mds_try_to_split_dir(obd, dparent, &mea, 0,
@@ -1148,9 +1203,17 @@ got_child:
                         handle = NULL;
                         GOTO(cleanup, rc);
                 }
+                if (id_fid(rec->ur_id2))
+                        fid = id_fid(rec->ur_id2); 
+                else 
+                        fid = mds_alloc_fid(obd);
+                
                 dchild->d_fsdata = (void *) &dp;
                 dp.p_ptr = req;
                 dp.p_inum = ino;
+                
+                dp.p_fid = fid;
+                dp.p_group = mds->mds_num;
 
                 rc = ll_vfs_create(dparent->d_inode, dchild, rec->ur_mode, NULL);
                 if (dchild->d_fsdata == (void *)(unsigned long)ino)
@@ -1160,7 +1223,9 @@ got_child:
                         CDEBUG(D_INODE, "error during create: %d\n", rc);
                         GOTO(cleanup, rc);
                 }
+
                 inode = dchild->d_inode;
+
                 if (ino) {
                         LASSERT(ino == inode->i_ino);
                         
@@ -1197,35 +1262,20 @@ got_child:
                         MD_COUNTER_INCREMENT(obd, create);
                 }
 
-                if (ino) {
-                        rc = mds_update_inode_sid(obd, dchild->d_inode,
-                                                  handle, rec->ur_id2);
-                        if (rc) {
-                                CERROR("mds_update_inode_sid() failed, "
-                                       "rc = %d\n", rc);
-                        }
-                        id_assign_fid(&body->id1, rec->ur_id2);
+                mds_inode2id(obd, &body->id1, inode, fid);
+                mds_update_inode_ids(obd, dchild->d_inode, handle, &body->id1, &sid);
 
-                        /* 
-                         * make sure, that fid is up-to-date.
-                         */
-                        mds_set_last_fid(obd, id_fid(rec->ur_id2));
-                } else {
-                        rc = mds_alloc_inode_sid(obd, dchild->d_inode,
-                                                 handle, &body->id1);
+                if ((rec->ur_flags & MDS_OPEN_HAS_KEY)) { 
+                        rc = mds_set_gskey(obd, handle, dchild->d_inode, 
+                                           rec->ur_ea2data, rec->ur_ea2datalen, 
+                                           ATTR_KEY | ATTR_MAC);
                         if (rc) {
-                                CERROR("mds_alloc_inode_sid() failed, "
-                                       "rc = %d\n", rc);
+                                CERROR("error in set gs key rc %d\n", rc); 
                         }
                 }
 
-                if (!(rec->ur_flags & O_EXCL)) { /* bug 3313 */
-                        rc = fsfilt_commit(obd, dchild->d_inode->i_sb,
-                                           dchild->d_inode, handle, 
-                                           req->rq_export->exp_sync);
-                        handle = NULL;
-                }
-
+                rc = fsfilt_commit(obd, dchild->d_inode->i_sb, dchild->d_inode, handle, 0);
+                handle = NULL;
                 acc_mode = 0;           /* Don't check for permissions */
         }
         mds_pack_inode2body(obd, body, dchild->d_inode, 1);
@@ -1336,7 +1386,6 @@ got_child:
         }
 #else
 /* re-enable test 24n in sanity.sh: it needs LOOKUP lock on open */
-#warning "disable opencache lock for CMD2"
 #endif
 
         /* Step 5: mds_open it */
@@ -1345,6 +1394,17 @@ got_child:
         if (rc)
                 GOTO(cleanup, rc);
 
+        /* reintegration case */
+        if ((rec->ur_flags & MDS_REINT_REQ)) {
+                rc = mds_fidmap_add(obd, &body->id1);
+                if (rc < 0) {
+                        CERROR("can't create fid->ino mapping, err %d\n",
+                               rc);
+                } else {
+                       rc = 0;
+               }
+        }
+        
         /* if this is a writer, we have to invalidate client's
          * update locks in order to make sure they don't use
          * isize/iblocks from mds anymore.
@@ -1354,7 +1414,7 @@ got_child:
                 struct ldlm_res_id child_res_id = { .name = {0}};
                 ldlm_policy_data_t sz_policy;
                 struct lustre_handle sz_lockh;
-                int lock_flags = 0;
+                int lock_flags = LDLM_FL_ATOMIC_CB;
 
                 child_res_id.name[0] = id_fid(&body->id1);
                 child_res_id.name[1] = id_group(&body->id1);
@@ -1394,6 +1454,12 @@ cleanup_no_trans:
                 } else if (created) {
                         mds_lock_new_child(obd, dchild->d_inode, NULL);
                 }
+                /* audit stuff for OPEN */
+                if (offset == 3) {
+                        mds_audit(req, dchild, rec->ur_name,
+                                  rec->ur_namelen - 1, AUDIT_OPEN, rc);
+                }
+
                 l_dput(dchild);
         case 1:
                 if (dparent == NULL)
@@ -1462,6 +1528,28 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset,
                 reply_body = lustre_msg_buf(req->rq_repmsg, 0,
                                             sizeof(*reply_body));
 
+        if (request_body && (request_body->valid & OBD_MD_FLSIZE)) {
+                /* we set i_size/i_blocks here, nobody will see
+                 * them until all write references are dropped.
+                 * btw, we hold one reference */
+                LASSERT(mfd->mfd_mode & FMODE_WRITE);
+                LASSERT(request_body->valid & OBD_MD_FLEPOCH);
+                LASSERT(MDS_FILTERDATA(inode));
+                if (MDS_FILTERDATA(inode)->io_epoch != request_body->io_epoch)
+                        CDEBUG(D_ERROR, "try to update attr. for old epoch "
+                               LPD64" while current "LPD64"\n",
+                               MDS_FILTERDATA(inode)->io_epoch,
+                               request_body->io_epoch);
+                i_size_write(inode, request_body->size);
+                inode->i_blocks = request_body->blocks;
+                LTIME_S(inode->i_mtime) = (request_body->mtime);
+
+                LTIME_S(iattr.ia_mtime) = request_body->mtime;
+                iattr.ia_size = inode->i_size;
+                iattr.ia_valid |= ATTR_SIZE|ATTR_MTIME;
+                mds_inode_unset_attrs_old(inode);
+        }
+
         idlen = ll_id2str(idname, inode->i_ino, inode->i_generation);
         CDEBUG(D_INODE, "inode %p ino %s nlink %d orphan %d\n", inode, 
                idname, inode->i_nlink, mds_orphan_open_count(inode));
@@ -1553,17 +1641,9 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset,
                }
 
                 goto out; /* Don't bother updating attrs on unlinked inode */
-        } else if ((mfd->mfd_mode & FMODE_WRITE) && rc == 0 && request_body) {
+        } else if ((mfd->mfd_mode & FMODE_WRITE) && rc == 0) {
                 /* last writer closed file - let's update i_size/i_blocks */
-                if (request_body->valid & OBD_MD_FLSIZE) {
-                        LASSERT(request_body->valid & OBD_MD_FLBLOCKS);
-                        CDEBUG(D_OTHER, "update size "LPD64" for "DLID4
-                               ", epoch "LPD64"\n", inode->i_size,
-                               OLID4(&request_body->id1),
-                               request_body->io_epoch);
-                        iattr.ia_size = inode->i_size;
-                        iattr.ia_valid |= ATTR_SIZE;
-                }
+                mds_validate_size(obd, inode, request_body, &iattr);
         }
 
 #if 0
@@ -1624,6 +1704,13 @@ out:
         /* If other clients have this file open for write, rc will be > 0 */
         if (rc > 0)
                 rc = 0;
+        if (!obd->obd_recovering && mds_inode_has_old_attrs(inode)
+                        && !mds_inode_is_orphan(inode)
+                        && atomic_read(&inode->i_writecount) == 0
+                        && inode->i_nlink != 0) {
+                CERROR("leave inode %lu/%u with old attributes (nlink = %d)\n",
+                       inode->i_ino, inode->i_generation, inode->i_nlink);
+        }
         l_dput(mfd->mfd_dentry);
         mds_mfd_destroy(mfd);
 
@@ -1683,12 +1770,12 @@ static int mds_extent_lock_callback(struct ldlm_lock *lock,
 }
 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
 __u64 lov_merge_blocks(struct lov_stripe_md *lsm);
+__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
 
-int mds_validate_size(struct obd_device *obd, struct mds_body *body,
-                      struct mds_file_data *mfd)
+int mds_validate_size(struct obd_device *obd, struct inode *inode,
+                      struct mds_body *body, struct iattr *iattr)
 {
         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
-        struct inode *inode = mfd->mfd_dentry->d_inode;
         struct lustre_handle lockh = { 0 };
         struct lov_stripe_md *lsm = NULL;
         int rc, len, flags;
@@ -1699,32 +1786,15 @@ int mds_validate_size(struct obd_device *obd, struct mds_body *body,
         if (!S_ISREG(inode->i_mode))
                 RETURN(0);
 
-        /* we update i_size/i_blocks only for writers */
-        if (!(mfd->mfd_mode & FMODE_WRITE))
-                RETURN(0);
-
-        /* we like when client reports actual i_size/i_blocks himself */
-        if (body->valid & OBD_MD_FLSIZE) {
-                LASSERT(body->valid & OBD_MD_FLBLOCKS);
-                CDEBUG(D_OTHER, "client reports "LPD64"/"LPD64" for "DLID4"\n",
-                       body->size, body->blocks, OLID4(&body->id1));
-                RETURN(0);
-        }
-
         /* we shouldn't fetch size from OSTes during recovery - deadlock */
-        if (obd->obd_recovering)
+        if (obd->obd_recovering) {
+                CERROR("size-on-mds has no support on OST yet\n");
                 RETURN(0);
+        }
 
-        DOWN_READ_I_ALLOC_SEM(inode);
-        if (atomic_read(&inode->i_writecount) > 1 
-                        || mds_inode_is_orphan(inode)) {
-                /* there is no need to update i_size/i_blocks on orphans.
-                 * also, if this is not last writer, then it doesn't make
-                 * sense to fetch i_size/i_blocks from OSSes */
-                UP_READ_I_ALLOC_SEM(inode);
+        /* if nobody modified attrs. we're lucky */
+        if (!mds_inode_has_old_attrs(inode))
                 RETURN(0);
-        }
-        UP_READ_I_ALLOC_SEM(inode);
 
         /* 1: client didn't send actual i_size/i_blocks
          * 2: we seem to be last writer
@@ -1771,17 +1841,32 @@ int mds_validate_size(struct obd_device *obd, struct mds_body *body,
                          LCK_PR, &flags, mds_extent_lock_callback,
                          ldlm_completion_ast, NULL, NULL,
                          sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
-        if (rc != 0) {
+        if (rc == -ENOENT) {
+                /* while we were enqueueing lock on OST, another thread
+                 * unlinked the file and started OST object destoying.
+                 * it's safe to return 0 here */
+                GOTO(cleanup, rc = 0);
+        } else if (rc != 0) {
                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
                 GOTO(cleanup, rc);
         }
 
-        body->size = lov_merge_size(lsm, 0);
-        body->blocks = lov_merge_blocks(lsm);
-        body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+        CDEBUG(D_INODE, "LOV reports "LPD64"/%lu for "DLID4" [%s%s%s]\n",
+               inode->i_size, inode->i_blocks, OLID4(&body->id1),
+               atomic_read(&inode->i_writecount) > 1 ? "U" : "",
+               mds_inode_has_old_attrs(inode) ? "D" : "",
+               mds_inode_is_orphan(inode) ? "O" : "");
 
-        CDEBUG(D_OTHER, "LOV reports "LPD64"/"LPD64" for "DLID4"\n",
-                        body->size, body->blocks, OLID4(&body->id1));
+        i_size_write(inode, lov_merge_size(lsm, 0));
+        inode->i_blocks = lov_merge_blocks(lsm);
+        LTIME_S(inode->i_mtime) = lov_merge_mtime(lsm, LTIME_S(inode->i_mtime));
+        iattr->ia_size = inode->i_size;
+        LTIME_S(iattr->ia_mtime) = LTIME_S(inode->i_mtime);
+        iattr->ia_valid |= ATTR_SIZE | ATTR_MTIME;
+        
+        DOWN_WRITE_I_ALLOC_SEM(inode);
+        mds_inode_unset_attrs_old(inode);
+        UP_WRITE_I_ALLOC_SEM(inode);
 
         obd_cancel(obd->u.mds.mds_dt_exp, lsm, LCK_PR, &lockh);
         
@@ -1841,23 +1926,17 @@ int mds_close(struct ptlrpc_request *req, int offset)
                 RETURN(-ESTALE);
         }
 
-        rc = mds_validate_size(obd, body, mfd);
-        LASSERT(rc == 0);
-
         inode = mfd->mfd_dentry->d_inode;
 
-        if (mfd->mfd_mode & FMODE_WRITE) {
-                /* we set i_size/i_blocks here, nobody will see
-                 * them until all write references are dropped.
-                 * btw, we hold one reference */
-                if (body->valid & OBD_MD_FLSIZE)
-                        i_size_write(inode, body->size);
-                if (body->valid & OBD_MD_FLBLOCKS)
-                        inode->i_blocks = body->blocks;
-        }
-
         /* child i_alloc_sem protects orphan_dec_test && is_orphan race */
         DOWN_WRITE_I_ALLOC_SEM(inode); /* mds_mfd_close drops this */
+
+        if (body->flags & MDS_BFLAG_DIRTY_EPOCH) {
+                /* the client modified data through the handle
+                 * we need to care about attrs. -bzzz */
+                mds_inode_set_attrs_old(inode);
+        }
+
         if (mds_inode_is_orphan(inode) && mds_orphan_open_count(inode) == 1) {
                 struct mds_body *rep_body;
 
@@ -1890,6 +1969,7 @@ int mds_close(struct ptlrpc_request *req, int offset)
         RETURN(0);
 }
 
+
 int mds_done_writing(struct ptlrpc_request *req, int offset)
 {
         struct mds_body *body;