Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / llite / file.c
index ff5d1d6..4c16e1c 100644 (file)
@@ -124,6 +124,11 @@ static int ll_file_release(struct inode *inode, struct file *file)
         if (!fd) /* no process opened the file after an mcreate */
                 RETURN(rc = 0);
 
+        /* we might not be able to get a valid handle on this file
+         * again so we really want to flush our write cache.. */
+        filemap_fdatasync(inode->i_mapping);
+        filemap_fdatawait(inode->i_mapping);
+
         if (lsm != NULL) {
                 memset(&oa, 0, sizeof(oa));
                 oa.o_id = lsm->lsm_object_id;
@@ -182,17 +187,17 @@ static int ll_osc_open(struct lustre_handle *conn, struct inode *inode,
                 RETURN(-ENOMEM);
         oa->o_id = lsm->lsm_object_id;
         oa->o_mode = S_IFREG;
-        oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                       OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+        oa->o_valid = (OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLBLOCKS |
+                       OBD_MD_FLMTIME | OBD_MD_FLCTIME);
         rc = obd_open(conn, oa, lsm, NULL);
         if (rc)
                 GOTO(out, rc);
 
         file->f_flags &= ~O_LOV_DELAY_CREATE;
-        obdo_to_inode(inode, oa, (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                                  OBD_MD_FLMTIME | OBD_MD_FLCTIME));
+        obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS | OBD_MD_FLMTIME |
+                      OBD_MD_FLCTIME);
 
-        if (oa->o_valid |= OBD_MD_FLHANDLE)
+        if (oa->o_valid & OBD_MD_FLHANDLE)
                 memcpy(fd->fd_ostdata, obdo_handle(oa), FD_OSTDATA_SIZE);
 
         EXIT;
@@ -355,85 +360,140 @@ static int ll_file_open(struct inode *inode, struct file *file)
         return rc;
 }
 
-int ll_size_lock(struct inode *inode, struct lov_stripe_md *lsm, obd_off start,
-                 int mode, struct lustre_handle *lockh)
+/*
+ * really does the getattr on the inode and updates its fields
+ */
+int ll_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm,
+                     char *ostdata)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct obdo oa;
+        int rc;
+        ENTRY;
+
+        LASSERT(lsm);
+        LASSERT(sbi);
+
+        memset(&oa, 0, sizeof oa);
+        oa.o_id = lsm->lsm_object_id;
+        oa.o_mode = S_IFREG;
+        oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
+                OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+        if (ostdata != NULL) {
+                memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE);
+                oa.o_valid |= OBD_MD_FLHANDLE;
+        }
+
+        rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
+        if (rc)
+                RETURN(rc);
+
+        obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                           OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+
+        CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lu\n", lsm->lsm_object_id,
+               inode->i_size, inode->i_size);
+        RETURN(0);
+}
+
+/*
+ * we've acquired a lock and need to see if we should perform a getattr
+ * to update the file size that may have been updated by others that had
+ * their locks canceled.
+ */
+static int ll_size_validate(struct inode *inode, struct lov_stripe_md *lsm,
+                            char *ostdata, struct ldlm_extent *extent)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        int rc = 0;
+        ENTRY;
+
+        if (test_bit(LLI_F_DID_GETATTR, &lli->lli_flags))
+                RETURN(0);
+
+        down(&lli->lli_getattr_sem);
+
+        if (!test_bit(LLI_F_DID_GETATTR, &lli->lli_flags)) {
+                rc = ll_inode_getattr(inode, lsm, ostdata);
+                if ( rc == 0 ) 
+                        set_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
+        }
+
+        up(&lli->lli_getattr_sem);
+        RETURN(rc);
+}
+
+/*
+ * some callers, notably truncate, really don't want i_size set based
+ * on the the size returned by the getattr, or lock acquisition in 
+ * the future.
+ */
+int ll_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode,
+                   struct lov_stripe_md *lsm,
+                   int mode, struct ldlm_extent *extent,
+                   struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct ldlm_extent extent;
         int rc, flags = 0;
         ENTRY;
 
+        LASSERT(lockh->addr == 0 && lockh->cookie == 0);
+
         /* XXX phil: can we do this?  won't it screw the file size up? */
-        if (sbi->ll_flags & LL_SBI_NOLCK)
+        if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+            (sbi->ll_flags & LL_SBI_NOLCK))
                 RETURN(0);
 
-        extent.start = start;
-        extent.end = OBD_OBJECT_EOF;
+        CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
+               inode->i_ino, extent->start, extent->end);
 
-        rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent,
+        rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, extent,
                          sizeof(extent), mode, &flags, ll_lock_callback,
                          inode, sizeof(*inode), lockh);
+
         RETURN(rc);
 }
-
-int ll_size_unlock(struct inode *inode, struct lov_stripe_md *lsm, int mode,
+/*
+ * this grabs a lock and manually implements behaviour that makes it look
+ * like the OST is returning the file size with each lock acquisition
+ */
+int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
+                   struct lov_stripe_md *lsm,
+                   int mode, struct ldlm_extent *extent,
                    struct lustre_handle *lockh)
 {
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
         int rc;
         ENTRY;
 
-        /* XXX phil: can we do this?  won't it screw the file size up? */
-        if (sbi->ll_flags & LL_SBI_NOLCK)
-                RETURN(0);
+        rc = ll_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh);
 
-        rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
-        if (rc != ELDLM_OK) {
-                CERROR("lock cancel: %d\n", rc);
-                LBUG();
+        if (rc == ELDLM_OK) {
+                rc = ll_size_validate(inode, lsm, fd ? fd->fd_ostdata : NULL,
+                        extent);
+                if ( rc != 0 ) {
+                        ll_extent_unlock(fd, inode, lsm, mode, lockh);
+                        rc = ELDLM_GETATTR_ERROR;
+                }
         }
 
         RETURN(rc);
 }
 
-/* This function is solely "sampling" the file size, and does not explicit
- * locking on the size itself (see ll_size_lock() and ll_size_unlock()).
- *
- * XXX We need to optimize away the obd_getattr for decent performance here,
- *     by checking if we already have the size lock and considering our size
- *     authoritative in that case.  In order to do that either the act of
- *     getting the size lock includes retrieving the file size, or the client
- *     keeps an atomic flag in the inode which indicates whether the size
- *     has been updated (see bug 280).
- */
-int ll_file_size(struct inode *inode, struct lov_stripe_md *lsm, char *ostdata)
+int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
+                struct lov_stripe_md *lsm, int mode,
+                struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct obdo oa;
         int rc;
         ENTRY;
 
-        LASSERT(lsm);
-        LASSERT(sbi);
-
-        memset(&oa, 0, sizeof oa);
-        oa.o_id = lsm->lsm_object_id;
-        oa.o_mode = S_IFREG;
-        oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
-
-        if (ostdata != NULL) {
-                memcpy(&oa.o_inline, ostdata, FD_OSTDATA_SIZE);
-                oa.o_valid |= OBD_MD_FLHANDLE;
-        }
+        /* XXX phil: can we do this?  won't it screw the file size up? */
+        if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+            (sbi->ll_flags & LL_SBI_NOLCK))
+                RETURN(0);
 
-        rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
-        if (!rc) {
-                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                                        OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-                CDEBUG(D_INODE, "objid "LPX64" size %Lu/%Lx\n",
-                       lsm->lsm_object_id, inode->i_size, inode->i_size);
-        }
+        rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
 
         RETURN(rc);
 }
@@ -481,6 +541,7 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
                      void *data, int flag)
 {
         struct inode *inode = data;
+        struct ll_inode_info *lli = ll_i2info(inode);
         struct lustre_handle lockh = { 0, 0 };
         int rc;
         ENTRY;
@@ -497,11 +558,15 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
                 break;
         case LDLM_CB_CANCELING:
+                /* FIXME: we could be given 'canceling intents' so that we
+                 * could know to write-back or simply throw away the pages
+                 * based on if the cancel comes from a desire to, say,
+                 * read or truncate.. */
                 CDEBUG(D_INODE, "invalidating obdo/inode %lu\n", inode->i_ino);
-                /* FIXME: do something better than throwing away everything */
-                //down(&inode->i_sem);
-                ll_invalidate_inode_pages(inode);
-                //up(&inode->i_sem);
+                filemap_fdatasync(inode->i_mapping);
+                filemap_fdatawait(inode->i_mapping);
+                clear_bit(LLI_F_DID_GETATTR, &lli->lli_flags);
+                truncate_inode_pages(inode->i_mapping, 0);
                 break;
         default:
                 LBUG();
@@ -515,57 +580,49 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
 {
         struct ll_file_data *fd = filp->private_data;
         struct inode *inode = filp->f_dentry->d_inode;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lov_stripe_md *lsm = lli->lli_smd;
         struct lustre_handle lockh = { 0, 0 };
-        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        int flags = 0;
+        struct ll_read_extent rextent;
         ldlm_error_t err;
         ssize_t retval;
         ENTRY;
-
         CDEBUG(D_VFSTRACE, "VFS Op\n");
-        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
-            !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                struct ldlm_extent extent;
-                extent.start = *ppos;
-                extent.end = *ppos + count - 1;
-                CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
-                       inode->i_ino, extent.start, extent.end);
-
-                err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
-                                  &extent, sizeof(extent), LCK_PR, &flags,
-                                  ll_lock_callback, inode, sizeof(*inode),
-                                  &lockh);
-                if (err != ELDLM_OK) {
-                        CERROR("lock enqueue: err: %d\n", err);
-                        RETURN(err);
-                }
-        }
 
-        /* If we don't refresh the file size, generic_file_read may not even
-         * call ll_readpage */
-        retval = ll_file_size(inode, lsm, fd->fd_ostdata);
-        if (retval < 0) {
-                CERROR("ll_file_size: "LPSZ"\n", retval);
+        /* "If nbyte is 0, read() will return 0 and have no other results."
+         *                      -- Single Unix Spec */
+        if (count == 0)
+                RETURN(0);
+
+        rextent.re_extent.start = *ppos;
+        rextent.re_extent.end = *ppos + count - 1;
+
+        err = ll_extent_lock(fd, inode, lsm, 
+                             LCK_PR, &rextent.re_extent, &lockh);
+        if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+                retval = -ENOLCK;
                 RETURN(retval);
         }
 
+        /* XXX tell ll_readpage what pages have a PR lock.. */
+        rextent.re_task = current;
+        spin_lock(&lli->lli_read_extent_lock);
+        list_add(&rextent.re_lli_item, &lli->lli_read_extents);
+        spin_unlock(&lli->lli_read_extent_lock);
+
         CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
                inode->i_ino, count, *ppos);
         retval = generic_file_read(filp, buf, count, ppos);
 
+        spin_lock(&lli->lli_read_extent_lock);
+        list_del(&rextent.re_lli_item);
+        spin_unlock(&lli->lli_read_extent_lock);
+
         if (retval > 0)
                 ll_update_atime(inode);
 
-        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
-            !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, &lockh);
-                if (err != ELDLM_OK) {
-                        CERROR("lock cancel: err: %d\n", err);
-                        retval = err;
-                }
-        }
-
+        /* XXX errors? */
+        ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
         RETURN(retval);
 }
 
@@ -577,71 +634,43 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
 {
         struct ll_file_data *fd = file->private_data;
         struct inode *inode = file->f_dentry->d_inode;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct lustre_handle lockh = { 0, 0 }, eof_lockh = { 0, 0 };
+        struct lustre_handle lockh = { 0, 0 };
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        int flags = 0;
+        struct ldlm_extent extent;
         ldlm_error_t err;
         ssize_t retval;
         ENTRY;
 
         /* POSIX, but surprised the VFS doesn't check this already */
         if (count == 0)
-                return 0;
+                RETURN(0);
 
         CDEBUG(D_VFSTRACE, "VFS Op\n");
         if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
-                err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockh);
-                if (err)
-                        RETURN(err);
-
-                /* Get size here so we know extent to enqueue write lock on. */
-                retval = ll_file_size(inode, lsm, fd->fd_ostdata);
-                if (retval)
-                        GOTO(out_eof, retval);
-
-                *ppos = inode->i_size;
-        }
-
-        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
-            !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                struct ldlm_extent extent;
+                extent.start = 0;
+                extent.end = OBD_OBJECT_EOF;
+        } else  {
                 extent.start = *ppos;
                 extent.end = *ppos + count - 1;
-                CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
-                       inode->i_ino, extent.start, extent.end);
-
-                err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
-                                  &extent, sizeof(extent), LCK_PW, &flags,
-                                  ll_lock_callback, inode, sizeof(*inode),
-                                  &lockh);
-                if (err != ELDLM_OK) {
-                        CERROR("lock enqueue: err: %d\n", err);
-                        GOTO(out_eof, retval = err);
-                }
         }
 
+        err = ll_extent_lock(fd, inode, lsm, LCK_PW, &extent, &lockh);
+        if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+                retval = -ENOLCK;
+                RETURN(retval);
+        }
+
+        if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND)
+                *ppos = inode->i_size;
+
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, count, *ppos);
 
         retval = generic_file_write(file, buf, count, ppos);
 
-        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
-            !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, &lockh);
-                if (err != ELDLM_OK)
-                        CERROR("lock cancel: err: %d\n", err);
-        }
-
-        EXIT;
- out_eof:
-        if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
-                err = ll_size_unlock(inode, lsm, LCK_PW, &eof_lockh);
-                if (err)
-                        CERROR("ll_size_unlock: %d\n", err);
-        }
-
-        return retval;
+        /* XXX errors? */
+        ll_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
+        RETURN(retval);
 }
 
 static int ll_lov_setstripe(struct inode *inode, struct file *file,
@@ -749,25 +778,27 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
 {
         struct inode *inode = file->f_dentry->d_inode;
-        long long retval;
+        struct ll_file_data *fd = file->private_data;
+        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+        struct lustre_handle lockh = {0, 0};
+        loff_t retval;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op\n");
-        switch (origin) {
-        case 2: {
-                struct ll_inode_info *lli = ll_i2info(inode);
-                struct ll_file_data *fd = file->private_data;
-
-                retval = ll_file_size(inode, lli->lli_smd, fd->fd_ostdata);
-                if (retval)
+        if (origin == 2) { /* SEEK_END */
+                ldlm_error_t err;
+                struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+                err = ll_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
+                if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+                        retval = -ENOLCK;
                         RETURN(retval);
+                }
 
                 offset += inode->i_size;
-                break;
-        }
-        case 1:
+        } else if (origin == 1) { /* SEEK_CUR */
                 offset += file->f_pos;
         }
+
         retval = -EINVAL;
         if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
                 if (offset != file->f_pos) {
@@ -779,14 +810,28 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
                 }
                 retval = offset;
         }
+
+        if (origin == 2)
+                ll_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
         RETURN(retval);
 }
 
-/* XXX this does not need to do anything for data, it _does_ need to
-   call setattr */
 int ll_fsync(struct file *file, struct dentry *dentry, int data)
 {
-        return 0;
+        int ret;
+        ENTRY;
+
+        /*
+         * filemap_fdata{sync,wait} are also called at PW lock cancelation so
+         * we know that they can only find data to writeback here if we are
+         * still holding the PW lock that covered the dirty pages.  XXX we
+         * should probably get a reference on it, though, just to be clear.
+         */
+        ret = filemap_fdatasync(dentry->d_inode->i_mapping);
+        if ( ret == 0 )
+                ret = filemap_fdatawait(dentry->d_inode->i_mapping);
+
+        RETURN(ret);
 }
 
 int ll_inode_revalidate(struct dentry *dentry)
@@ -848,10 +893,24 @@ int ll_inode_revalidate(struct dentry *dentry)
         if (!lsm)       /* object not yet allocated, don't validate size */
                 RETURN(0);
 
-        /* XXX this should probably become an unconditional obd_getattr()
-         *     so that we update the blocks count and mtime from the OST too.
+        /*
+         * unfortunately stat comes in through revalidate and we don't
+         * differentiate this use from initial instantiation.  we're
+         * also being wildly conservative and flushing write caches
+         * so that stat really returns the proper size.
          */
-        RETURN(ll_file_size(inode, lsm, NULL));
+        {
+                struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
+                struct lustre_handle lockh = {0, 0};
+                ldlm_error_t err;
+
+                err = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh);
+                if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED )
+                        RETURN(-abs(err)); /* XXX can't be right */
+
+                ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
+        }
+        RETURN(0);
 }
 
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
@@ -889,7 +948,7 @@ struct file_operations ll_file_operations = {
         release:        ll_file_release,
         mmap:           generic_file_mmap,
         llseek:         ll_file_seek,
-        fsync:          NULL
+        fsync:          ll_fsync,
 };
 
 struct inode_operations ll_file_inode_operations = {