Whamcloud - gitweb
b=1883
authorphil <phil>
Mon, 24 Nov 2003 04:45:54 +0000 (04:45 +0000)
committerphil <phil>
Mon, 24 Nov 2003 04:45:54 +0000 (04:45 +0000)
- Adds llite_close.c; Zach's originally, somewhat heavily hacked
- Adds an MDS_DONE_WRITING rpc and handler
- Adds client-side code for storing the MDS FID and epoch in the
  obdo's inline field in BRW_WRITE requests

lustre/llite/llite_close.c [new file with mode: 0644]
lustre/llite/llite_internal.h
lustre/mds/mds_internal.h
lustre/obdclass/obdo.c

diff --git a/lustre/llite/llite_close.c b/lustre/llite/llite_close.c
new file mode 100644 (file)
index 0000000..bf064c0
--- /dev/null
@@ -0,0 +1,263 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Lustre Lite routines to issue a secondary close after writeback
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <linux/module.h>
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <linux/lustre_mds.h>
+#include <linux/lustre_lite.h>
+#include "llite_internal.h"
+
+/* record that a write is in flight */
+void llap_write_pending(struct inode *inode, struct ll_async_page *llap)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        spin_lock(&lli->lli_lock);
+        list_add(&llap->llap_pending_write, &lli->lli_pending_write_llaps);
+        spin_unlock(&lli->lli_lock);
+}
+
+/* record that a write has completed */
+void llap_write_complete(struct inode *inode, struct ll_async_page *llap)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        spin_lock(&lli->lli_lock);
+        if (!list_empty(&llap->llap_pending_write))
+                list_del_init(&llap->llap_pending_write);
+        spin_unlock(&lli->lli_lock);
+}
+
+void ll_open_complete(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        spin_lock(&lli->lli_lock);
+        lli->lli_send_done_writing = 0;
+        spin_unlock(&lli->lli_lock);
+}
+
+/* if we close with writes in flight then we want the completion or cancelation
+ * of those writes to send a DONE_WRITING rpc to the MDS */
+int ll_is_inode_dirty(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        int rc = 0;
+        ENTRY;
+
+        spin_lock(&lli->lli_lock);
+        if (!list_empty(&lli->lli_pending_write_llaps))
+                rc = 1;
+        spin_unlock(&lli->lli_lock);
+        RETURN(rc);
+}
+
+void ll_try_done_writing(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
+
+        spin_lock(&lli->lli_lock);
+
+        if (lli->lli_send_done_writing &&
+            list_empty(&lli->lli_pending_write_llaps)) {
+
+                spin_lock(&lcq->lcq_lock);
+                if (list_empty(&lli->lli_close_item)) {
+                        CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
+                               inode->i_ino, inode->i_generation);
+                        LASSERT(igrab(inode) == inode);
+                        list_add_tail(&lli->lli_close_item, &lcq->lcq_list);
+                        wake_up(&lcq->lcq_waitq);
+                }
+                spin_unlock(&lcq->lcq_lock);
+        }
+
+        spin_unlock(&lli->lli_lock);
+}
+
+/* The MDS needs us to get the real file attributes, then send a DONE_WRITING */
+void ll_queue_done_writing(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        ENTRY;
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_send_done_writing = 1;
+        spin_unlock(&lli->lli_lock);
+
+        ll_try_done_writing(inode);
+        EXIT;
+}
+
+/* If we know the file size and have the cookies:
+ *  - send a DONE_WRITING rpc
+ *
+ * Otherwise:
+ *  - get a whole-file lock
+ *  - get the authoritative size and all cookies with GETATTRs
+ *  - send a DONE_WRITING rpc
+ */
+static void ll_close_done_writing(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ldlm_extent extent = { .start = 0, .end = OBD_OBJECT_EOF };
+        struct lustre_handle lockh = { 0 };
+        struct obdo obdo;
+        obd_flag valid;
+        int rc, ast_flags = 0;
+        ENTRY;
+
+        memset(&obdo, 0, sizeof(obdo));
+        if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
+                goto rpc;
+
+        rc = ll_extent_lock_no_validate(NULL, inode, lli->lli_smd, LCK_PW,
+                                        &extent, &lockh, ast_flags);
+        if (rc != ELDLM_OK) {
+                CERROR("lock acquisition failed (%d): unable to send "
+                       "DONE_WRITING for inode %lu/%u\n", rc, inode->i_ino,
+                       inode->i_generation);
+                GOTO(out, rc);
+        }
+
+        rc = ll_lsm_getattr(ll_i2obdexp(inode), lli->lli_smd, &obdo);
+        if (rc) {
+                CERROR("inode_getattr failed (%d): unable to send DONE_WRITING "
+                       "for inode %lu/%u\n", rc, inode->i_ino,
+                       inode->i_generation);
+                ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh);
+                GOTO(out, rc);
+        }
+
+        obdo_refresh_inode(inode, &obdo, valid);
+
+        CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
+               lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks,
+               inode->i_blksize);
+
+        set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
+
+        rc = ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh);
+        if (rc != ELDLM_OK)
+                CERROR("unlock failed (%d)?  proceeding anyways...\n", rc);
+
+ rpc:
+        obdo.o_id = inode->i_ino;
+        obdo.o_size = inode->i_size;
+        obdo.o_blocks = inode->i_blocks;
+        obdo.o_valid = OBD_MD_FLID | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+
+        rc = mdc_done_writing(ll_i2sbi(inode)->ll_mdc_exp, &obdo);
+ out:
+        iput(inode);
+}
+
+static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
+{
+        struct ll_inode_info *lli = NULL;
+
+        spin_lock(&lcq->lcq_lock);
+
+        if (lcq->lcq_list.next == NULL)
+                lli = ERR_PTR(-1);
+        else if (!list_empty(&lcq->lcq_list)) {
+                lli = list_entry(lcq->lcq_list.next, struct ll_inode_info,
+                                 lli_close_item);
+                list_del(&lli->lli_close_item);
+        }
+
+        spin_unlock(&lcq->lcq_lock);
+        return lli;
+}
+
+static int ll_close_thread(void *arg)
+{
+        struct ll_close_queue *lcq = arg;
+        ENTRY;
+
+        /* XXX boiler-plate */
+        {
+                char name[sizeof(current->comm)];
+                unsigned long flags;
+                snprintf(name, sizeof(name) - 1, "ll_close");
+                kportal_daemonize(name);
+                SIGNAL_MASK_LOCK(current, flags);
+                sigfillset(&current->blocked);
+                RECALC_SIGPENDING;
+                SIGNAL_MASK_UNLOCK(current, flags);
+        }
+
+        complete(&lcq->lcq_comp);
+
+        while (1) {
+                struct l_wait_info lwi = { 0 };
+                struct ll_inode_info *lli;
+                struct inode *inode;
+
+                l_wait_event_exclusive(lcq->lcq_waitq,
+                                       (lli = ll_close_next_lli(lcq)) != NULL,
+                                       &lwi);
+                if (IS_ERR(lli))
+                        break;
+
+                inode = ll_info2i(lli);
+                ll_close_done_writing(inode);
+        }
+
+        complete(&lcq->lcq_comp);
+        RETURN(0);
+}
+
+int ll_close_thread_start(struct ll_close_queue **lcq_ret)
+{
+        struct ll_close_queue *lcq;
+        pid_t pid;
+
+        OBD_ALLOC(lcq, sizeof(*lcq));
+        if (lcq == NULL)
+                return -ENOMEM;
+
+        spin_lock_init(&lcq->lcq_lock);
+        INIT_LIST_HEAD(&lcq->lcq_list);
+        init_waitqueue_head(&lcq->lcq_waitq);
+        init_completion(&lcq->lcq_comp);
+
+        pid = kernel_thread(ll_close_thread, lcq, 0);
+        if (pid < 0) {
+                OBD_FREE(lcq, sizeof(*lcq));
+                return pid;
+        }
+
+        wait_for_completion(&lcq->lcq_comp);
+        *lcq_ret = lcq;
+        return 0;
+}
+
+void ll_close_thread_shutdown(struct ll_close_queue *lcq)
+{
+        init_completion(&lcq->lcq_comp);
+        lcq->lcq_list.next = NULL;
+        wake_up(&lcq->lcq_waitq);
+        wait_for_completion(&lcq->lcq_comp);
+        OBD_FREE(lcq, sizeof(*lcq));
+}
index a1d8faa..14e04a7 100644 (file)
@@ -93,6 +93,7 @@ struct ll_async_page {
         void            *llap_cookie;
         int             llap_queued;
         struct page     *llap_page;
+        struct list_head llap_pending_write;
 };
 
 #define LL_CDEBUG_PAGE(page, STR)                                       \
@@ -122,10 +123,9 @@ void ll_prepare_mdc_op_data(struct mdc_op_data *,
                             const char *name, int namelen, int mode);
 
 /* llite/rw.c */
-int ll_prepare_write(struct file *file, struct page *page, unsigned from,
-                            unsigned to);
-int ll_commit_write(struct file *file, struct page *page, unsigned from,
-                    unsigned to);
+int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
+int ll_commit_write(struct file *, struct page *, unsigned from, unsigned to);
+void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa);
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 #define ll_ap_completion ll_ap_completion_24
 void ll_ap_completion_24(void *data, int cmd, int rc);
@@ -222,6 +222,23 @@ int ll_dentry_to_fh(struct dentry *, __u32 *datap, int *lenp, int need_parent);
 /* llite/symlink.c */
 extern struct inode_operations ll_fast_symlink_inode_operations;
 
+/* llite/llite_close.c */
+struct ll_close_queue {
+        spinlock_t              lcq_lock;
+        struct list_head        lcq_list;
+        wait_queue_head_t       lcq_waitq;
+        struct completion       lcq_comp;
+};
+
+void llap_write_pending(struct inode *inode, struct ll_async_page *llap);
+void llap_write_complete(struct inode *inode, struct ll_async_page *llap);
+void ll_open_complete(struct inode *inode);
+int ll_is_inode_dirty(struct inode *inode);
+void ll_try_done_writing(struct inode *inode);
+void ll_queue_done_writing(struct inode *inode);
+void ll_close_thread_shutdown(struct ll_close_queue *lcq);
+int ll_close_thread_start(struct ll_close_queue **lcq_ret);
+
 /* generic */
 #define LL_SUPER_MAGIC 0x0BD00BD0
 
index 96d7653..5dcd667 100644 (file)
@@ -64,6 +64,7 @@ int mds_pin(struct ptlrpc_request *req);
 int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
                   struct mds_file_data *mfd, int unlink_orphan);
 int mds_close(struct ptlrpc_request *req);
+int mds_done_writing(struct ptlrpc_request *req);
 
 
 /* mds/mds_fs.c */
index 73ca0ee..aa604f8 100644 (file)
@@ -35,6 +35,7 @@
 
 #ifdef __KERNEL__
 #include <linux/fs.h>
+#include <linux/pagemap.h> /* for PAGE_CACHE_SIZE */
 
 void obdo_from_iattr(struct obdo *oa, struct iattr *attr, unsigned int ia_valid)
 {
@@ -220,6 +221,8 @@ void obdo_refresh_inode(struct inode *dst, struct obdo *src, obd_flag valid)
         /* optimum IO size */
         if (valid & OBD_MD_FLBLKSZ && src->o_blksize > dst->i_blksize)
                 dst->i_blksize = src->o_blksize;
+        if (dst->i_blksize < PAGE_CACHE_SIZE)
+                dst->i_blksize = PAGE_CACHE_SIZE;
         /* allocation of space */
         if (valid & OBD_MD_FLBLOCKS && src->o_blocks > dst->i_blocks)
                 dst->i_blocks = src->o_blocks;