Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter_io_24.c
index 4b2d012..49ca0fe 100644 (file)
@@ -8,23 +8,28 @@
  *   Author: Andreas Dilger <adilger@clusterfs.com>
  *   Author: Phil Schwan <phil@clusterfs.com>
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
  */
 
+#ifdef HAVE_KERNEL_CONFIG_H
 #include <linux/config.h>
+#endif
 #include <linux/module.h>
 #include <linux/pagemap.h> // XXX kill me soon
 #include <linux/version.h>
 #include <linux/iobuf.h>
 #include <linux/locks.h>
 
-#include <linux/obd_class.h>
-#include <linux/lustre_fsfilt.h>
+#include <obd_class.h>
+#include <lustre_fsfilt.h>
 #include "filter_internal.h"
 
-
-/* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data. */
-void inode_update_time(struct inode *inode, int ctime_too)
-{
-        time_t now = CURRENT_TIME;
-        if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
-                return;
-        inode->i_mtime = now;
-        if (ctime_too)
-                inode->i_ctime = now;
-        mark_inode_dirty_sync(inode);
-}
-
 /* Bug 2254 -- this is better done in ext3_map_inode_page, but this
  * workaround will suffice until everyone has upgraded their kernels */
 static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev,
@@ -83,17 +74,17 @@ static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev,
 static int filter_cleanup_mappings(int rw, struct kiobuf *iobuf,
                                    struct inode *inode)
 {
-        int i, blocks_per_page_bits = PAGE_SHIFT - inode->i_blkbits;
+        int i, blocks_per_page_bits = CFS_PAGE_SHIFT - inode->i_blkbits;
         ENTRY;
 
         for (i = 0 ; i < iobuf->nr_pages << blocks_per_page_bits; i++) {
-                if (iobuf->blocks[i] > 0)
+                if (KIOBUF_GET_BLOCKS(iobuf)[i] > 0)
                         continue;
 
                 if (rw == OBD_BRW_WRITE)
                         RETURN(-EINVAL);
 
-                iobuf->blocks[i] = -1UL;
+                KIOBUF_GET_BLOCKS(iobuf)[i] = -1UL;
         }
         RETURN(0);
 }
@@ -120,13 +111,9 @@ static void dump_page(int rw, unsigned long block, struct page *page)
  * free the buffers and drop the page from cache.  The buffers should not
  * be dirty, because we already called fdatasync/fdatawait on them.
  */
-static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
+static int filter_sync_inode_data(struct inode *inode)
 {
-        struct page *page;
-        int i, rc, rc2;
-
-        check_pending_bhs(KIOBUF_GET_BLOCKS(iobuf), iobuf->nr_pages,
-                          inode->i_dev, 1 << inode->i_blkbits);
+        int rc, rc2;
 
         /* This is nearly generic_osync_inode, without the waiting on the inode
         rc = generic_osync_inode(inode, inode->i_mapping,
@@ -139,6 +126,19 @@ static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
         rc2 = filemap_fdatawait(inode->i_mapping);
         if (rc == 0)
                 rc = rc2;
+
+        return rc;
+}
+
+static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
+{
+        struct page *page;
+        int i, rc;
+
+        check_pending_bhs(KIOBUF_GET_BLOCKS(iobuf), iobuf->nr_pages,
+                          inode->i_dev, 1 << inode->i_blkbits);
+
+        rc = filter_sync_inode_data(inode);
         if (rc != 0)
                 RETURN(rc);
 
@@ -149,6 +149,39 @@ static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
                                       iobuf->maplist[i]->index);
                 if (page == NULL)
                         continue;
+                if (page->mapping != NULL) {
+                        /* Now that the only source of such pages in truncate
+                         * path flushes these pages to disk and and then
+                         * discards, this is error condition */
+                        CERROR("Data page in page cache during write!\n");
+                        ll_truncate_complete_page(page);
+                }
+
+                unlock_page(page);
+                page_cache_release(page);
+        }
+
+        return 0;
+}
+
+int filter_clear_truncated_page(struct inode *inode)
+{
+        struct page *page;
+        int rc;
+
+        /* Truncate on page boundary, so nothing to flush? */
+        if (!(inode->i_size & ~CFS_PAGE_MASK))
+                return 0;
+
+        rc = filter_sync_inode_data(inode);
+        if (rc != 0)
+                RETURN(rc);
+
+        /* be careful to call this after fsync_inode_data_buffers has waited
+         * for IO to complete before we evict it from the cache */
+        page = find_lock_page(inode->i_mapping,
+                              inode->i_size >> CFS_PAGE_SHIFT);
+        if (page) {
                 if (page->mapping != NULL)
                         ll_truncate_complete_page(page);
 
@@ -160,15 +193,15 @@ static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
 }
 
 /* Must be called with i_sem taken for writes; this will drop it */
-int filter_direct_io(int rw, struct dentry *dchild, void *buf,
+int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *buf,
                      struct obd_export *exp, struct iattr *attr,
                      struct obd_trans_info *oti, void **wait_handle)
 {
         struct obd_device *obd = exp->exp_obd;
         struct inode *inode = dchild->d_inode;
-        struct kiobuf *iobuf = buf;
-        int rc, create = (rw == OBD_BRW_WRITE), *created = NULL, committed = 0;
-        int blocks_per_page = PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0;
+        struct kiobuf *iobuf = (void *)buf;
+        int rc, create = (rw == OBD_BRW_WRITE), committed = 0;
+        int blocks_per_page = CFS_PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0;
         struct semaphore *sem = NULL;
         ENTRY;
 
@@ -180,7 +213,7 @@ int filter_direct_io(int rw, struct dentry *dchild, void *buf,
         if (iobuf->nr_pages * blocks_per_page > KIO_MAX_SECTORS)
                 GOTO(cleanup, rc = -EINVAL);
 
-        if (iobuf->nr_pages * blocks_per_page > 
+        if (iobuf->nr_pages * blocks_per_page >
             OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
                 GOTO(cleanup, rc = -EINVAL);
 
@@ -195,9 +228,8 @@ int filter_direct_io(int rw, struct dentry *dchild, void *buf,
                 create = 1;
                 sem = &obd->u.filter.fo_alloc_lock;
         }
-        
         rc = fsfilt_map_inode_pages(obd, inode, iobuf->maplist,
-                                    iobuf->nr_pages, iobuf->blocks, 
+                                    iobuf->nr_pages, KIOBUF_GET_BLOCKS(iobuf),
                                     obdfilter_created_scratchpad, create, sem);
         if (rc)
                 GOTO(cleanup, rc);
@@ -207,17 +239,22 @@ int filter_direct_io(int rw, struct dentry *dchild, void *buf,
                 GOTO(cleanup, rc);
 
         if (rw == OBD_BRW_WRITE) {
-                filter_tally_write(&obd->u.filter, iobuf->maplist,
-                                   iobuf->nr_pages, iobuf->blocks,
-                                   blocks_per_page);
+                if (rc == 0) {
+                        filter_tally_write(exp, iobuf->maplist, iobuf->nr_pages,
+                                           KIOBUF_GET_BLOCKS(iobuf),
+                                           blocks_per_page);
+
+                        if (attr->ia_size > inode->i_size)
+                                attr->ia_valid |= ATTR_SIZE;
+                        rc = fsfilt_setattr(obd, dchild,
+                                            oti->oti_handle, attr, 0);
+                        if (rc)
+                                GOTO(cleanup, rc);
+                }
 
-                if (attr->ia_size > inode->i_size)
-                        attr->ia_valid |= ATTR_SIZE;
-                rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0);
-                if (rc)
-                        GOTO(cleanup, rc);
                 up(&inode->i_sem);
                 cleanup_phase = 3;
+
                 rc = filter_finish_transno(exp, oti, 0);
                 if (rc)
                         GOTO(cleanup, rc);
@@ -226,6 +263,9 @@ int filter_direct_io(int rw, struct dentry *dchild, void *buf,
                 committed = 1;
                 if (rc)
                         GOTO(cleanup, rc);
+        } else {
+                filter_tally_read(exp, iobuf->maplist, iobuf->nr_pages,
+                                  KIOBUF_GET_BLOCKS(iobuf), blocks_per_page);
         }
 
         rc = filter_clear_page_cache(inode, iobuf);
@@ -234,18 +274,17 @@ int filter_direct_io(int rw, struct dentry *dchild, void *buf,
 
         rc = fsfilt_send_bio(rw, obd, inode, iobuf);
 
-        CDEBUG(D_INFO, "tried to write %d pages, rc = %d\n",
-               iobuf->nr_pages, rc);
+        CDEBUG(D_INFO, "tried to %s %d pages, rc = %d\n",
+               rw & OBD_BRW_WRITE ? "write" : "read", iobuf->nr_pages, rc);
 
         if (rc > 0)
                 rc = 0;
 
         EXIT;
 cleanup:
-        if (!committed && (rw == OBD_BRW_WRITE)) {                
+        if (!committed && (rw == OBD_BRW_WRITE)) {
                 int err = fsfilt_commit_async(obd, inode,
                                               oti->oti_handle, wait_handle);
-                oti->oti_handle = NULL;
                 if (err)
                         CERROR("can't close transaction: %d\n", err);
                 /*
@@ -260,7 +299,7 @@ cleanup:
                 unlock_kiovec(1, &iobuf);
         case 1:
         case 0:
-                if (cleanup_phase != 3 && rw == OBD_BRW_WRITE)            
+                if (cleanup_phase != 3 && rw == OBD_BRW_WRITE)
                         up(&inode->i_sem);
                 break;
         default:
@@ -307,47 +346,63 @@ static void clear_kiobuf(struct kiobuf *iobuf)
         iobuf->length = 0;
 }
 
-int filter_alloc_iobuf(int rw, int num_pages, void **ret)
+struct filter_iobuf *filter_alloc_iobuf(struct filter_obd *filter,
+                                        int rw, int num_pages)
 {
-        int rc;
         struct kiobuf *iobuf;
+        int rc;
         ENTRY;
 
         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
 
         rc = alloc_kiovec(1, &iobuf);
         if (rc)
-                RETURN(rc);
+                RETURN(ERR_PTR(rc));
 
         rc = expand_kiobuf(iobuf, num_pages);
         if (rc) {
                 free_kiovec(1, &iobuf);
-                RETURN(rc);
+                RETURN(ERR_PTR(rc));
         }
 
 #ifdef HAVE_KIOBUF_DOVARY
         iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
 #endif
         clear_kiobuf(iobuf);
-        *ret = iobuf;
-        RETURN(0);
+        RETURN((void *)iobuf);
 }
 
-void filter_free_iobuf(void *buf)
+void filter_free_iobuf(struct filter_iobuf *buf)
 {
-        struct kiobuf *iobuf = buf;
+        struct kiobuf *iobuf = (void *)buf;
 
         clear_kiobuf(iobuf);
         free_kiovec(1, &iobuf);
 }
 
-int filter_iobuf_add_page(struct obd_device *obd, void *buf,
+void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf,
+                      struct obd_trans_info *oti)
+{
+        int thread_id = oti ? oti->oti_thread_id : -1;
+
+        if (unlikely(thread_id < 0)) {
+                filter_free_iobuf(iobuf);
+                return;
+        }
+
+        LASSERTF(filter->fo_iobuf_pool[thread_id] == iobuf,
+                 "iobuf mismatch for thread %d: pool %p iobuf %p\n",
+                 thread_id, filter->fo_iobuf_pool[thread_id], iobuf);
+        clear_kiobuf((void *)iobuf);
+}
+
+int filter_iobuf_add_page(struct obd_device *obd, struct filter_iobuf *buf,
                            struct inode *inode, struct page *page)
 {
-        struct kiobuf *iobuf = buf;
+        struct kiobuf *iobuf = (void *)buf;
 
         iobuf->maplist[iobuf->nr_pages++] = page;
-        iobuf->length += PAGE_SIZE;
+        iobuf->length += CFS_PAGE_SIZE;
 
         return 0;
 }
@@ -375,9 +430,9 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
         if (rc != 0)
                 GOTO(cleanup, rc);
 
-        rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, &iobuf);
-        if (rc)
-                GOTO(cleanup, rc);
+        iobuf = filter_iobuf_get(&obd->u.filter, oti);
+        if (IS_ERR(iobuf))
+                GOTO(cleanup, rc = PTR_ERR(iobuf));
         cleanup_phase = 1;
 
         fso.fso_dentry = res->dentry;
@@ -396,7 +451,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
                         continue;
 
                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
-                
+
                 /* We expect these pages to be in offset order, but we'll
                  * be forgiving */
                 this_size = lnb->offset + lnb->len;
@@ -419,25 +474,55 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
                 GOTO(cleanup, rc);
         }
 
-        fsfilt_check_slow(now, obd_timeout, "brw_start");
+        fsfilt_check_slow(obd, now, obd_timeout, "brw_start");
+
+        i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+        /* If the inode still has SUID+SGID bits set (see filter_precreate())
+         * then we will accept the UID+GID if sent by the client for
+         * initializing the ownership of this inode.  We only allow this to
+         * happen once (so clear these bits) and later only allow setattr. */
+        if (inode->i_mode & S_ISUID)
+                i |= OBD_MD_FLUID;
+        if (inode->i_mode & S_ISGID)
+                i |= OBD_MD_FLGID;
+
+        iattr_from_obdo(&iattr, oa, i);
+        if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
+                CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
+                       (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
+
+                cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
+
+                iattr.ia_valid |= ATTR_MODE;
+                iattr.ia_mode = inode->i_mode;
+                if (iattr.ia_valid & ATTR_UID)
+                        iattr.ia_mode &= ~S_ISUID;
+                if (iattr.ia_valid & ATTR_GID)
+                        iattr.ia_mode &= ~S_ISGID;
+
+                rc = filter_update_fidea(exp, inode, oti->oti_handle, oa);
+        }
 
-        iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
         /* filter_direct_io drops i_sem */
         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
                               oti, &wait_handle);
         if (rc == 0)
                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
 
-        fsfilt_check_slow(now, obd_timeout, "direct_io");
+        fsfilt_check_slow(obd, now, obd_timeout, "direct_io");
 
         err = fsfilt_commit_wait(obd, inode, wait_handle);
-        if (err)
+        if (err) {
+                CERROR("Failure to commit OST transaction (%d)?\n", err);
                 rc = err;
-        if (obd_sync_filter && !err)
+        }
+        if (obd->obd_replayable && !rc)
                 LASSERTF(oti->oti_transno <= obd->obd_last_committed,
                          "oti_transno "LPU64" last_committed "LPU64"\n",
                          oti->oti_transno, obd->obd_last_committed);
-        fsfilt_check_slow(now, obd_timeout, "commitrw commit");
+        fsfilt_check_slow(obd, now, obd_timeout, "commitrw commit");
+
 cleanup:
         filter_grant_commit(exp, niocount, res);
 
@@ -446,9 +531,12 @@ cleanup:
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 LASSERT(current->journal_info == NULL);
         case 1:
-                filter_free_iobuf(iobuf);
+                filter_iobuf_put(&obd->u.filter, iobuf, oti);
         case 0:
-                filter_free_dio_pages(objcount, obj, niocount, res);
+                /*
+                 * lnb->page automatically returns back into per-thread page
+                 * pool (bug 5137)
+                 */
                 f_dput(res->dentry);
         }