Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / llite / file.c
index 4c16e1c..3429b28 100644 (file)
  */
 
 #define DEBUG_SUBSYSTEM S_LLITE
-
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_lite.h>
 #include <linux/obd_lov.h>      /* for lov_mds_md_size() in lov_setstripe() */
 #include <linux/random.h>
+#include <linux/pagemap.h>
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+#include <linux/lustre_compat25.h>
+#endif
 
 int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
 extern int ll_setattr(struct dentry *de, struct iattr *attr);
@@ -44,22 +47,25 @@ static int ll_mdc_close(struct lustre_handle *mdc_conn, struct inode *inode,
 
         /* Complete the open request and remove it from replay list */
         rc = mdc_close(&ll_i2sbi(inode)->ll_mdc_conn, inode->i_ino,
-                       inode->i_mode, &fd->fd_mdshandle, &req);
+                       inode->i_mode, &fd->fd_mds_och.och_fh, &req);
         if (rc)
                 CERROR("inode %lu close failed: rc = %d\n", inode->i_ino, rc);
 
-        imp = fd->fd_req->rq_import;
+        imp = fd->fd_mds_och.och_req->rq_import;
         LASSERT(imp != NULL);
         spin_lock_irqsave(&imp->imp_lock, flags);
 
-        DEBUG_REQ(D_HA, fd->fd_req, "matched open req %p", fd->fd_req);
+        DEBUG_REQ(D_HA, fd->fd_mds_och.och_req, "matched open req %p",
+                  fd->fd_mds_och.och_req);
 
         /* We held on to the request for replay until we saw a close for that
          * file.  Now that we've closed it, it gets replayed on the basis of
          * its transno only. */
-        fd->fd_req->rq_flags &= ~PTL_RPC_FL_REPLAY;
+        spin_lock (&fd->fd_mds_och.och_req->rq_lock);
+        fd->fd_mds_och.och_req->rq_replay = 0;
+        spin_unlock (&fd->fd_mds_och.och_req->rq_lock);
 
-        if (fd->fd_req->rq_transno) {
+        if (fd->fd_mds_och.och_req->rq_transno) {
                 /* This open created a file, so it needs replay as a
                  * normal transaction now.  Our reference to it now
                  * effectively owned by the imp_replay_list, and it'll
@@ -78,7 +84,7 @@ static int ll_mdc_close(struct lustre_handle *mdc_conn, struct inode *inode,
                  * the basis of that and we don't need to do anything
                  * magical here. */
                 if (!req->rq_transno) {
-                        req->rq_transno = fd->fd_req->rq_transno;
+                        req->rq_transno = fd->fd_mds_och.och_req->rq_transno;
                         ptlrpc_retain_replayable_request(req, imp);
                 }
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
@@ -92,14 +98,14 @@ static int ll_mdc_close(struct lustre_handle *mdc_conn, struct inode *inode,
                 /* No transno means that we can just drop our ref. */
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
         }
-        ptlrpc_req_finished(fd->fd_req);
+        ptlrpc_req_finished(fd->fd_mds_och.och_req);
 
         /* Do this after the fd_req->rq_transno check, because we don't want
          * to bounce off zero references. */
         ptlrpc_req_finished(req);
-        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
+        fd->fd_mds_och.och_fh.cookie = DEAD_HANDLE_MAGIC;
         file->private_data = NULL;
-        kmem_cache_free(ll_file_data_slab, fd);
+        OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
 
         RETURN(-abs(rc));
 }
@@ -109,7 +115,7 @@ static int ll_mdc_close(struct lustre_handle *mdc_conn, struct inode *inode,
  * rarely check close errors and even if an error is returned they will not
  * re-try the close call.
  */
-static int ll_file_release(struct inode *inode, struct file *file)
+int ll_file_release(struct inode *inode, struct file *file)
 {
         struct ll_file_data *fd;
         struct obdo oa;
@@ -119,6 +125,12 @@ static int ll_file_release(struct inode *inode, struct file *file)
         int rc = 0, rc2;
 
         ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
+               inode->i_generation, inode);
+
+        /* don't do anything for / */
+        if (inode->i_sb->s_root == file->f_dentry)
+                RETURN(0);
 
         fd = (struct ll_file_data *)file->private_data;
         if (!fd) /* no process opened the file after an mcreate */
@@ -126,22 +138,24 @@ static int ll_file_release(struct inode *inode, struct file *file)
 
         /* we might not be able to get a valid handle on this file
          * again so we really want to flush our write cache.. */
-        filemap_fdatasync(inode->i_mapping);
-        filemap_fdatawait(inode->i_mapping);
+        if (S_ISREG(inode->i_mode)) {
+                filemap_fdatasync(inode->i_mapping);
+                filemap_fdatawait(inode->i_mapping);
 
-        if (lsm != NULL) {
-                memset(&oa, 0, sizeof(oa));
-                oa.o_id = lsm->lsm_object_id;
-                oa.o_mode = S_IFREG;
-                oa.o_valid = OBD_MD_FLTYPE | OBD_MD_FLID;
+                if (lsm != NULL) {
+                        memset(&oa, 0, sizeof(oa));
+                        oa.o_id = lsm->lsm_object_id;
+                        oa.o_mode = S_IFREG;
+                        oa.o_valid = OBD_MD_FLTYPE | OBD_MD_FLID;
 
-                memcpy(&oa.o_inline, fd->fd_ostdata, FD_OSTDATA_SIZE);
-                oa.o_valid |= OBD_MD_FLHANDLE;
+                        memcpy(&oa.o_inline, &fd->fd_ost_och, FD_OSTDATA_SIZE);
+                        oa.o_valid |= OBD_MD_FLHANDLE;
 
-                rc = obd_close(&sbi->ll_osc_conn, &oa, lsm, NULL);
-                if (rc)
-                        CERROR("inode %lu object close failed: rc = %d\n",
-                               inode->i_ino, rc);
+                        rc = obd_close(&sbi->ll_osc_conn, &oa, lsm, NULL);
+                        if (rc)
+                                CERROR("inode %lu object close failed: rc = "
+                                       "%d\n", inode->i_ino, rc);
+                }
         }
 
         rc2 = ll_mdc_close(&sbi->ll_mdc_conn, inode, file);
@@ -155,20 +169,24 @@ static int ll_local_open(struct file *file, struct lookup_intent *it)
 {
         struct ptlrpc_request *req = it->it_data;
         struct ll_file_data *fd;
-        struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
+        struct mds_body *body;
         ENTRY;
 
+        body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
+        LASSERT (body != NULL);                 /* reply already checked out */
+        LASSERT_REPSWABBED (req, 1);            /* and swabbed down */
+
         LASSERT(!file->private_data);
 
-        fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
+        OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
         /* We can't handle this well without reorganizing ll_file_open and
          * ll_mdc_close, so don't even try right now. */
         LASSERT(fd != NULL);
 
         memset(fd, 0, sizeof(*fd));
 
-        memcpy(&fd->fd_mdshandle, &body->handle, sizeof(body->handle));
-        fd->fd_req = it->it_data;
+        memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
+        fd->fd_mds_och.och_req = it->it_data;
         file->private_data = fd;
 
         RETURN(0);
@@ -189,16 +207,13 @@ static int ll_osc_open(struct lustre_handle *conn, struct inode *inode,
         oa->o_mode = S_IFREG;
         oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLBLOCKS |
                        OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-        rc = obd_open(conn, oa, lsm, NULL);
+        rc = obd_open(conn, oa, lsm, NULL, &fd->fd_ost_och);
         if (rc)
                 GOTO(out, rc);
 
         file->f_flags &= ~O_LOV_DELAY_CREATE;
-        obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS | OBD_MD_FLMTIME |
-                      OBD_MD_FLCTIME);
-
-        if (oa->o_valid & OBD_MD_FLHANDLE)
-                memcpy(fd->fd_ostdata, obdo_handle(oa), FD_OSTDATA_SIZE);
+        obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
+                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
 
         EXIT;
 out:
@@ -219,6 +234,7 @@ static int ll_create_obj(struct lustre_handle *conn, struct inode *inode,
         struct lov_mds_md *lmm = NULL;
         struct obdo *oa;
         struct iattr iattr;
+        struct mdc_op_data op_data;
         int rc, err, lmm_size = 0;;
         ENTRY;
 
@@ -230,8 +246,7 @@ static int ll_create_obj(struct lustre_handle *conn, struct inode *inode,
         oa->o_id = inode->i_ino;
         /* Keep these 0 for now, because chown/chgrp does not change the
          * ownership on the OST, and we don't want to allow BA OST NFS
-         * users to access these objects by mistake.
-         */
+         * users to access these objects by mistake. */
         oa->o_uid = 0;
         oa->o_gid = 0;
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE |
@@ -247,6 +262,7 @@ static int ll_create_obj(struct lustre_handle *conn, struct inode *inode,
                 }
                 GOTO(out_oa, rc);
         }
+        obdo_to_inode(inode, oa, OBD_MD_FLBLKSZ);
 
         LASSERT(lsm && lsm->lsm_object_id);
         rc = obd_packmd(conn, &lmm, lsm);
@@ -258,11 +274,14 @@ static int ll_create_obj(struct lustre_handle *conn, struct inode *inode,
         /* Save the stripe MD with this file on the MDS */
         memset(&iattr, 0, sizeof(iattr));
         iattr.ia_valid = ATTR_FROM_OPEN;
-        rc = mdc_setattr(&ll_i2sbi(inode)->ll_mdc_conn, inode, &iattr,
-                         lmm, lmm_size, &req);
+
+        ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0);
+
+        rc = mdc_setattr(&ll_i2sbi(inode)->ll_mdc_conn, &op_data,
+                         &iattr, lmm, lmm_size, &req);
         ptlrpc_req_finished(req);
 
-        obd_free_wiremd(conn, &lmm);
+        obd_free_diskmd (conn, &lmm);
 
         /* If we couldn't complete mdc_open() and store the stripe MD on the
          * MDS, we need to destroy the objects now or they will be leaked.
@@ -273,6 +292,7 @@ static int ll_create_obj(struct lustre_handle *conn, struct inode *inode,
                 GOTO(out_destroy, rc);
         }
         lli->lli_smd = lsm;
+        lli->lli_maxbytes = lsm->lsm_maxbytes;
 
         EXIT;
 out_oa:
@@ -308,7 +328,7 @@ out_destroy:
  */
 extern int ll_it_open_error(int phase, struct lookup_intent *it);
 
-static int ll_file_open(struct inode *inode, struct file *file)
+int ll_file_open(struct inode *inode, struct file *file)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_inode_info *lli = ll_i2info(inode);
@@ -318,7 +338,13 @@ static int ll_file_open(struct inode *inode, struct file *file)
         int rc = 0;
         ENTRY;
 
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
+               inode->i_generation, inode);
+
+        /* don't do anything for / */
+        if (inode->i_sb->s_root == file->f_dentry)
+                RETURN(0);
+
         LL_GET_INTENT(file->f_dentry, it);
         rc = ll_it_open_error(IT_OPEN_OPEN, it);
         if (rc)
@@ -328,7 +354,10 @@ static int ll_file_open(struct inode *inode, struct file *file)
         if (rc)
                 LBUG();
 
-        mdc_set_open_replay_data((struct ll_file_data *)file->private_data);
+        mdc_set_open_replay_data(&((struct ll_file_data *)
+                                   file->private_data)->fd_mds_och);
+        if (!S_ISREG(inode->i_mode))
+                RETURN(0);
 
         lsm = lli->lli_smd;
         if (lsm == NULL) {
@@ -364,69 +393,86 @@ static int ll_file_open(struct inode *inode, struct file *file)
  * really does the getattr on the inode and updates its fields
  */
 int ll_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm,
-                     char *ostdata)
+                     void *ostdata)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ptlrpc_request_set *set;
         struct obdo oa;
+        int bef, aft;
+        unsigned long before, after;
         int rc;
         ENTRY;
 
         LASSERT(lsm);
         LASSERT(sbi);
+        LASSERT(lli);
 
         memset(&oa, 0, sizeof oa);
         oa.o_id = lsm->lsm_object_id;
         oa.o_mode = S_IFREG;
         oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+                OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
+                OBD_MD_FLCTIME;
 
         if (ostdata != NULL) {
                 memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE);
                 oa.o_valid |= OBD_MD_FLHANDLE;
         }
 
-        rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
-        if (rc)
-                RETURN(rc);
-
-        obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                           OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-
-        CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lu\n", lsm->lsm_object_id,
-               inode->i_size, inode->i_size);
-        RETURN(0);
-}
-
-/*
- * we've acquired a lock and need to see if we should perform a getattr
- * to update the file size that may have been updated by others that had
- * their locks canceled.
- */
-static int ll_size_validate(struct inode *inode, struct lov_stripe_md *lsm,
-                            char *ostdata, struct ldlm_extent *extent)
-{
-        struct ll_inode_info *lli = ll_i2info(inode);
-        int rc = 0;
-        ENTRY;
-
-        if (test_bit(LLI_F_DID_GETATTR, &lli->lli_flags))
+        /* getattr can race with writeback.  we don't want to trust a getattr
+         * that doesn't include the writeback of our farthest cached pages
+         * that it raced with. */
+        do {
+                bef = ll_farthest_dirty(&lli->lli_dirty, &before);
+#if 0
+                rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
+#else
+                set = ptlrpc_prep_set ();
+                if (set == NULL) {
+                        CERROR ("ENOMEM allocing request set\n");
+                        rc = -ENOMEM;
+                } else {
+                        rc = obd_getattr_async(&sbi->ll_osc_conn, &oa, lsm, set);
+                        if (rc == 0)
+                                rc = ptlrpc_set_wait (set);
+                        ptlrpc_set_destroy (set);
+                }
+#endif
+                if (rc)
+                        RETURN(rc);
+
+                aft = ll_farthest_dirty(&lli->lli_dirty, &after);
+                CDEBUG(D_INODE, " %d,%lu -> %d,%lu\n", bef, before, aft, after);
+        } while (bef == 0 &&
+                 (aft != 0 || after < before) &&
+                 oa.o_size < ((u64)before + 1) << PAGE_CACHE_SHIFT);
+
+        obdo_to_inode(inode, &oa, (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
+                                   OBD_MD_FLMTIME | OBD_MD_FLCTIME));
+        if (inode->i_blksize < PAGE_CACHE_SIZE)
+                inode->i_blksize = PAGE_CACHE_SIZE;
+
+        /* make sure getattr doesn't return a size that causes writeback
+         * to forget about cached writes */
+        if ((aft == 0) && oa.o_size < ((u64)after + 1) << PAGE_CACHE_SHIFT) {
+                CDEBUG(D_INODE, "cached at %lu, keeping %llu i_size instead "
+                                "of oa "LPU64"\n", after, inode->i_size,
+                                oa.o_size);
                 RETURN(0);
-
-        down(&lli->lli_getattr_sem);
-
-        if (!test_bit(LLI_F_DID_GETATTR, &lli->lli_flags)) {
-                rc = ll_inode_getattr(inode, lsm, ostdata);
-                if ( rc == 0 ) 
-                        set_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
         }
 
-        up(&lli->lli_getattr_sem);
-        RETURN(rc);
+        obdo_to_inode(inode, &oa, OBD_MD_FLSIZE);
+
+        CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lu blksize %lu\n",
+               lsm->lsm_object_id, inode->i_size, inode->i_size,
+               inode->i_blksize);
+        RETURN(0);
 }
 
 /*
  * some callers, notably truncate, really don't want i_size set based
- * on the the size returned by the getattr, or lock acquisition in 
+ * on the the size returned by the getattr, or lock acquisition in
  * the future.
  */
 int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
@@ -438,14 +484,14 @@ int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
         int rc, flags = 0;
         ENTRY;
 
-        LASSERT(lockh->addr == 0 && lockh->cookie == 0);
+        LASSERT(lockh->cookie == 0);
 
         /* XXX phil: can we do this?  won't it screw the file size up? */
         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
             (sbi->ll_flags & LL_SBI_NOLCK))
                 RETURN(0);
 
-        CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
+        CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
                inode->i_ino, extent->start, extent->end);
 
         rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, extent,
@@ -454,30 +500,53 @@ int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
 
         RETURN(rc);
 }
+
 /*
- * this grabs a lock and manually implements behaviour that makes it look
- * like the OST is returning the file size with each lock acquisition
+ * this grabs a lock and manually implements behaviour that makes it look like
+ * the OST is returning the file size with each lock acquisition.
  */
 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
                    struct lov_stripe_md *lsm,
                    int mode, struct ldlm_extent *extent,
                    struct lustre_handle *lockh)
 {
-        int rc;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ldlm_extent size_lock;
+        struct lustre_handle match_lockh = {0};
+        int flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
+        int rc, matched;
         ENTRY;
 
         rc = ll_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh);
+        if (rc != ELDLM_OK)
+                RETURN(rc);
 
-        if (rc == ELDLM_OK) {
-                rc = ll_size_validate(inode, lsm, fd ? fd->fd_ostdata : NULL,
-                        extent);
-                if ( rc != 0 ) {
-                        ll_extent_unlock(fd, inode, lsm, mode, lockh);
-                        rc = ELDLM_GETATTR_ERROR;
-                }
+        if (test_bit(LLI_F_HAVE_SIZE_LOCK, &lli->lli_flags))
+                RETURN(0);
+
+        rc = ll_inode_getattr(inode, lsm, fd ? &fd->fd_ost_och : NULL);
+        if (rc) {
+                ll_extent_unlock(fd, inode, lsm, mode, lockh);
+                RETURN(rc);
         }
 
-        RETURN(rc);
+        size_lock.start = inode->i_size;
+        size_lock.end = OBD_OBJECT_EOF;
+
+        /* XXX I bet we should be checking the lock ignore flags.. */
+        matched = obd_match(&ll_i2sbi(inode)->ll_osc_conn, lsm, LDLM_EXTENT,
+                       &size_lock, sizeof(size_lock), LCK_PR, &flags,
+                       &match_lockh);
+
+        /* hey, alright, we hold a size lock that covers the size we
+         * just found, its not going to change for a while.. */
+        if (matched == 1) {
+                set_bit(LLI_F_HAVE_SIZE_LOCK, &lli->lli_flags);
+                obd_cancel(&ll_i2sbi(inode)->ll_osc_conn, lsm, LCK_PR,
+                           &match_lockh);
+        }
+
+        RETURN(0);
 }
 
 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
@@ -513,16 +582,13 @@ static inline void ll_remove_suid(struct inode *inode)
         }
 }
 
+#if 0
 static void ll_update_atime(struct inode *inode)
 {
 #ifdef USE_ATIME
         struct iattr attr;
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        attr.ia_atime = CURRENT_TIME;
-#else
-        attr.ia_atime = CURRENT_TIME.tv_sec;
-#endif
+        attr.ia_atime = LTIME_S(CURRENT_TIME);
         attr.ia_valid = ATTR_ATIME;
 
         if (inode->i_atime == attr.ia_atime) return;
@@ -536,19 +602,170 @@ static void ll_update_atime(struct inode *inode)
         inode->i_atime = CURRENT_TIME;
 #endif
 }
+#endif
+
+/*
+ * flush the page cache for an extent as its canceled.  when we're on an
+ * lov we get a lock cancelation for each of the obd locks under the lov
+ * so we have to map the obd's region back onto the stripes in the file
+ * that it held.
+ *
+ * no one can dirty the extent until we've finished our work and they
+ * can enqueue another lock.
+ *
+ * XXX this could be asking the inode's dirty tree for info
+ */
+void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
+                              struct ldlm_lock *lock)
+{
+        struct ldlm_extent *extent = &lock->l_extent;
+        unsigned long start, end, count, skip, i, j;
+        struct page *page;
+        int ret;
+        ENTRY;
+
+        CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n",
+               inode->i_ino, inode, extent->start, extent->end, inode->i_size);
+
+        start = extent->start >> PAGE_CACHE_SHIFT;
+        count = ~0;
+        skip = 0;
+        end = (extent->end >> PAGE_CACHE_SHIFT) + 1;
+        if ((end << PAGE_CACHE_SHIFT) < extent->end)
+                end = ~0;
+        if (lsm->lsm_stripe_count > 1) {
+                struct {
+                        char name[16];
+                        struct ldlm_lock *lock;
+                        struct lov_stripe_md *lsm;
+                } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
+                __u32 stripe;
+                __u32 vallen = sizeof(stripe);
+                int rc;
+
+                /* get our offset in the lov */
+                rc = obd_get_info(ll_i2obdconn(inode), sizeof(key),
+                                  &key, &vallen, &stripe);
+                if (rc != 0) {
+                        CERROR("obd_get_info: rc = %d\n", rc);
+                        LBUG();
+                }
+                LASSERT(stripe < lsm->lsm_stripe_count);
+
+                count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+                skip = (lsm->lsm_stripe_count - 1) * count;
+                start += (start/count * skip) + (stripe * count);
+                if (end != ~0)
+                        end += (end/count * skip) + (stripe * count);
+        }
+
+        i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT;
+        if (end >= i)
+                clear_bit(LLI_F_HAVE_SIZE_LOCK, &(ll_i2info(inode)->lli_flags));
+        if (i < end)
+                end = i;
+
+        CDEBUG(D_INODE, "start: %lu j: %lu count: %lu skip: %lu end: %lu\n",
+               start, start % count, count, skip, end);
+
+        /* start writeback on dirty pages in the extent when its PW */
+        for (i = start, j = start % count;
+                        lock->l_granted_mode == LCK_PW && i < end; j++, i++) {
+                if (j == count) {
+                        i += skip;
+                        j = 0;
+                }
+                /* its unlikely, but give us a chance to bail when we're out */
+                PGCACHE_WRLOCK(inode->i_mapping);
+                if (list_empty(&inode->i_mapping->dirty_pages)) {
+                        CDEBUG(D_INODE, "dirty list empty\n");
+                        PGCACHE_WRUNLOCK(inode->i_mapping);
+                        break;
+                }
+                PGCACHE_WRUNLOCK(inode->i_mapping);
+
+                if (need_resched())
+                        schedule();
+
+                page = find_get_page(inode->i_mapping, i);
+                if (page == NULL)
+                        continue;
+                if (!PageDirty(page) || TryLockPage(page)) {
+                        page_cache_release(page);
+                        continue;
+                }
+                if (PageDirty(page)) {
+                        CDEBUG(D_INODE, "writing page %p\n", page);
+                        PGCACHE_WRLOCK(inode->i_mapping);
+                        list_del(&page->list);
+                        list_add(&page->list, &inode->i_mapping->locked_pages);
+                        PGCACHE_WRUNLOCK(inode->i_mapping);
+
+                        /* this writepage might write out pages outside
+                         * this extent, but that's ok, the pages are only
+                         * still dirty because a lock still covers them */
+                        ClearPageDirty(page);
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+                        ret = inode->i_mapping->a_ops->writepage(page);
+#else
+                        ret = inode->i_mapping->a_ops->writepage(page, NULL);
+#endif
+                        if (ret != 0)
+                                unlock_page(page);
+                } else {
+                        unlock_page(page);
+                }
+                page_cache_release(page);
+
+        }
+
+        /* our locks are page granular thanks to osc_enqueue, we invalidate the
+         * whole page. */
+        LASSERT((extent->start & ~PAGE_CACHE_MASK) == 0);
+        LASSERT(((extent->end+1) & ~PAGE_CACHE_MASK) == 0);
+        for (i = start, j = start % count ; i < end ; j++, i++) {
+                if ( j == count ) {
+                        i += skip;
+                        j = 0;
+                }
+                PGCACHE_WRLOCK(inode->i_mapping);
+                if (list_empty(&inode->i_mapping->dirty_pages) &&
+                     list_empty(&inode->i_mapping->clean_pages) &&
+                     list_empty(&inode->i_mapping->locked_pages)) {
+                        CDEBUG(D_INODE, "nothing left\n");
+                        PGCACHE_WRUNLOCK(inode->i_mapping);
+                        break;
+                }
+                PGCACHE_WRUNLOCK(inode->i_mapping);
+                if (need_resched())
+                        schedule();
+                page = find_get_page(inode->i_mapping, i);
+                if (page == NULL)
+                        continue;
+                CDEBUG(D_INODE, "dropping page %p at %lu\n", page, page->index);
+                lock_page(page);
+                if (page->mapping) /* might have raced */
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+                        truncate_complete_page(page);
+#else
+                        truncate_complete_page(page->mapping, page);
+#endif                
+                unlock_page(page);
+                page_cache_release(page);
+        }
+        EXIT;
+}
 
 int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
                      void *data, int flag)
 {
         struct inode *inode = data;
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lustre_handle lockh = { 0, 0 };
+        struct lustre_handle lockh = { 0 };
         int rc;
         ENTRY;
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
 
-        if (inode == NULL)
-                LBUG();
+        LASSERT(inode != NULL);
 
         switch (flag) {
         case LDLM_CB_BLOCKING:
@@ -562,11 +779,10 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
                  * could know to write-back or simply throw away the pages
                  * based on if the cancel comes from a desire to, say,
                  * read or truncate.. */
-                CDEBUG(D_INODE, "invalidating obdo/inode %lu\n", inode->i_ino);
-                filemap_fdatasync(inode->i_mapping);
-                filemap_fdatawait(inode->i_mapping);
-                clear_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
-                truncate_inode_pages(inode->i_mapping, 0);
+                LASSERT((unsigned long)inode > 0x1000);
+                LASSERT((unsigned long)lli > 0x1000);
+                LASSERT((unsigned long)lli->lli_smd > 0x1000);
+                ll_pgcache_remove_extent(inode, lli->lli_smd, lock);
                 break;
         default:
                 LBUG();
@@ -582,27 +798,29 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         struct inode *inode = filp->f_dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
-        struct lustre_handle lockh = { 0, 0 };
+        struct lustre_handle lockh = { 0 };
         struct ll_read_extent rextent;
         ldlm_error_t err;
         ssize_t retval;
         ENTRY;
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
+               inode->i_ino, inode->i_generation, inode, count, *ppos);
 
         /* "If nbyte is 0, read() will return 0 and have no other results."
          *                      -- Single Unix Spec */
         if (count == 0)
                 RETURN(0);
 
+        /* grab a -> eof extent to push extending writes out of node's caches
+         * so we can see them at the getattr after lock acquisition.  this will
+         * turn into a seperate [*ppos + count, EOF] 'size intent' lock attempt
+         * in the future. */
         rextent.re_extent.start = *ppos;
-        rextent.re_extent.end = *ppos + count - 1;
+        rextent.re_extent.end = OBD_OBJECT_EOF;
 
-        err = ll_extent_lock(fd, inode, lsm, 
-                             LCK_PR, &rextent.re_extent, &lockh);
-        if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
-                retval = -ENOLCK;
-                RETURN(retval);
-        }
+        err = ll_extent_lock(fd, inode, lsm, LCK_PR, &rextent.re_extent,&lockh);
+        if (err != ELDLM_OK)
+                RETURN(-ENOLCK);
 
         /* XXX tell ll_readpage what pages have a PR lock.. */
         rextent.re_task = current;
@@ -618,9 +836,6 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         list_del(&rextent.re_lli_item);
         spin_unlock(&lli->lli_read_extent_lock);
 
-        if (retval > 0)
-                ll_update_atime(inode);
-
         /* XXX errors? */
         ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
         RETURN(retval);
@@ -634,40 +849,72 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
 {
         struct ll_file_data *fd = file->private_data;
         struct inode *inode = file->f_dentry->d_inode;
-        struct lustre_handle lockh = { 0, 0 };
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+        struct lustre_handle lockh = { 0 };
         struct ldlm_extent extent;
+        loff_t maxbytes = ll_file_maxbytes(inode);
         ldlm_error_t err;
         ssize_t retval;
+        char should_validate = 1;
         ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
+               inode->i_ino, inode->i_generation, inode, count, *ppos);
+
+        /*
+         * sleep doing some writeback work of this mount's dirty data
+         * if the VM thinks we're low on memory.. other dirtying code
+         * paths should think about doing this, too, but they should be
+         * careful not to hold locked pages while they do so.  like
+         * ll_prepare_write.  *cough*
+         */
+        LL_CHECK_DIRTY(inode->i_sb);
 
         /* POSIX, but surprised the VFS doesn't check this already */
         if (count == 0)
                 RETURN(0);
 
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
-        if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
+        if (file->f_flags & O_APPEND) {
                 extent.start = 0;
                 extent.end = OBD_OBJECT_EOF;
         } else  {
                 extent.start = *ppos;
                 extent.end = *ppos + count - 1;
+                /* we really don't care what i_size is if we're doing
+                 * fully page aligned writes */
+                if ((*ppos & ~PAGE_CACHE_MASK) == 0 &&
+                    (count & ~PAGE_CACHE_MASK) == 0)
+                        should_validate = 0;
         }
 
-        err = ll_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh);
-        if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
-                retval = -ENOLCK;
-                RETURN(retval);
-        }
-
-        if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND)
+        if (should_validate)
+                err = ll_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh);
+        else
+                err = ll_extent_lock_no_validate(fd, inode, lsm, LCK_PW,
+                                                 &extent, &lockh);
+        if (err != ELDLM_OK)
+                RETURN(-ENOLCK);
+
+        /* this is ok, g_f_w will overwrite this under i_sem if it races
+         * with a local truncate, it just makes our maxbyte checking easier */
+        if (file->f_flags & O_APPEND)
                 *ppos = inode->i_size;
 
+        if (*ppos >= maxbytes) {
+                if (count || *ppos > maxbytes) {
+                        send_sig(SIGXFSZ, current, 0);
+                        GOTO(out, retval = -EFBIG);
+                }
+        }
+        if (*ppos + count > maxbytes)
+                count = maxbytes - *ppos;
+
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, count, *ppos);
 
+        /* generic_file_write handles O_APPEND after getting i_sem */
         retval = generic_file_write(file, buf, count, ppos);
 
+out:
         /* XXX errors? */
         ll_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
         RETURN(retval);
@@ -686,7 +933,7 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
         lsm = lli->lli_smd;
         if (lsm) {
                 up(&lli->lli_open_sem);
-                CERROR("stripe already set for ino %lu\n", inode->i_ino);
+                CERROR("stripe already exists for ino %lu\n", inode->i_ino);
                 /* If we haven't already done the open, do so now */
                 if (file->f_flags & O_LOV_DELAY_CREATE) {
                         int rc2 = ll_osc_open(conn, inode, file, lsm);
@@ -694,7 +941,7 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
                                 RETURN(rc2);
                 }
 
-                RETURN(-EALREADY);
+                RETURN(-EEXIST);
         }
 
         rc = obd_iocontrol(LL_IOC_LOV_SETSTRIPE, conn, 0, &lsm, (void *)arg);
@@ -730,8 +977,8 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
         struct ll_file_data *fd = file->private_data;
         struct lustre_handle *conn;
         int flags;
-
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%u\n", inode->i_ino,
+               inode->i_generation, inode, cmd);
 
         if ((cmd & 0xffffff00) == ((int)'T') << 8) /* tty ioctls */
                 return -ENOTTY;
@@ -780,19 +1027,19 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_file_data *fd = file->private_data;
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        struct lustre_handle lockh = {0, 0};
+        struct lustre_handle lockh = {0};
         loff_t retval;
         ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),to=%llu\n", inode->i_ino,
+               inode->i_generation, inode,
+               offset + ((origin==2) ? inode->i_size : file->f_pos));
 
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
         if (origin == 2) { /* SEEK_END */
                 ldlm_error_t err;
                 struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
                 err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
-                if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
-                        retval = -ENOLCK;
-                        RETURN(retval);
-                }
+                if (err != ELDLM_OK)
+                        RETURN(-ENOLCK);
 
                 offset += inode->i_size;
         } else if (origin == 1) { /* SEEK_CUR */
@@ -800,7 +1047,7 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
         }
 
         retval = -EINVAL;
-        if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
+        if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
                 if (offset != file->f_pos) {
                         file->f_pos = offset;
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
@@ -819,7 +1066,10 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
 int ll_fsync(struct file *file, struct dentry *dentry, int data)
 {
         int ret;
+        struct inode *inode = dentry->d_inode;
         ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
+               inode->i_generation, inode);
 
         /*
          * filemap_fdata{sync,wait} are also called at PW lock cancelation so
@@ -837,14 +1087,15 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
 int ll_inode_revalidate(struct dentry *dentry)
 {
         struct inode *inode = dentry->d_inode;
-        struct lov_stripe_md *lsm;
+        struct lov_stripe_md *lsm = NULL;
         ENTRY;
 
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
         if (!inode) {
                 CERROR("REPORT THIS LINE TO PETER\n");
                 RETURN(0);
         }
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
+               inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
 
         /* this is very tricky.  it is unsafe to call ll_have_md_lock
            when we have a referenced lock: because it may cause an RPC
@@ -855,37 +1106,67 @@ int ll_inode_revalidate(struct dentry *dentry)
             !ll_have_md_lock(dentry)) {
                 struct ptlrpc_request *req = NULL;
                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
+                struct ll_fid fid;
                 struct mds_body *body;
+                struct lov_mds_md *lmm;
                 unsigned long valid = 0;
-                int datalen = 0, rc;
+                int eadatalen = 0, rc;
 
                 /* Why don't we update all valid MDS fields here, if we're
                  * doing an RPC anyways?  -phil */
                 if (S_ISREG(inode->i_mode)) {
-                        datalen = obd_size_wiremd(&sbi->ll_osc_conn, NULL);
+                        eadatalen = obd_size_diskmd(&sbi->ll_osc_conn, NULL);
                         valid |= OBD_MD_FLEASIZE;
                 }
-                rc = mdc_getattr(&sbi->ll_mdc_conn, inode->i_ino,
-                                 inode->i_mode, valid, datalen, &req);
+                ll_inode2fid(&fid, inode);
+                rc = mdc_getattr(&sbi->ll_mdc_conn, &fid,
+                                 valid, eadatalen, &req);
                 if (rc) {
                         CERROR("failure %d inode %lu\n", rc, inode->i_ino);
-                        ptlrpc_req_finished(req);
                         RETURN(-abs(rc));
                 }
 
-                body = lustre_msg_buf(req->rq_repmsg, 0);
+                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
+                LASSERT (body != NULL);         /* checked by mdc_getattr() */
+                LASSERT_REPSWABBED (req, 0);    /* swabbed by mdc_getattr() */
 
                 if (S_ISREG(inode->i_mode) &&
-                    body->valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) {
+                    (body->valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))) {
                         CERROR("MDS sent back size for regular file\n");
                         body->valid &= ~(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
                 }
 
-                if (body->valid & OBD_MD_FLEASIZE)
-                        ll_update_inode(inode, body,
-                                        lustre_msg_buf(req->rq_repmsg, 1));
-                else
-                        ll_update_inode(inode, body, NULL);
+                /* XXX Too paranoid? */
+                if ((body->valid ^ valid) & OBD_MD_FLEASIZE)
+                        CERROR("Asked for %s eadata but got %s\n",
+                               (valid & OBD_MD_FLEASIZE) ? "some" : "no",
+                               (body->valid & OBD_MD_FLEASIZE) ? "some":"none");
+
+                if (S_ISREG(inode->i_mode) &&
+                    (body->valid & OBD_MD_FLEASIZE)) {
+                        if (body->eadatasize == 0) { /* no EA data */
+                                CERROR("OBD_MD_FLEASIZE set but no data\n");
+                                RETURN(-EPROTO);
+                        }
+                        /* Only bother with this if inode's lsm not set? */
+                        lmm = lustre_msg_buf(req->rq_repmsg,1,body->eadatasize);
+                        LASSERT(lmm != NULL);       /* mdc_getattr() checked */
+                        LASSERT_REPSWABBED(req, 1); /* mdc_getattr() swabbed */
+
+                        rc = obd_unpackmd (&sbi->ll_osc_conn,
+                                           &lsm, lmm, body->eadatasize);
+                        if (rc < 0) {
+                                CERROR("Error %d unpacking eadata\n", rc);
+                                ptlrpc_req_finished(req);
+                                RETURN(rc);
+                        }
+                        LASSERT(rc >= sizeof (*lsm));
+                }
+
+                ll_update_inode(inode, body, lsm);
+                if (lsm != NULL && ll_i2info(inode)->lli_smd != lsm)
+                        obd_free_memmd(&sbi->ll_osc_conn, &lsm);
+
                 ptlrpc_req_finished(req);
         }
 
@@ -901,12 +1182,12 @@ int ll_inode_revalidate(struct dentry *dentry)
          */
         {
                 struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
-                struct lustre_handle lockh = {0, 0};
+                struct lustre_handle lockh = {0};
                 ldlm_error_t err;
 
                 err = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh);
-                if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED )
-                        RETURN(-abs(err)); /* XXX can't be right */
+                if (err != ELDLM_OK)
+                        RETURN(err);
 
                 ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
         }