From 95754fe966d8276da085c8c4d51cb279ffe6552f Mon Sep 17 00:00:00 2001 From: phil Date: Mon, 24 Nov 2003 04:45:54 +0000 Subject: [PATCH] b=1883 - Adds llite_close.c; Zach's originally, somewhat heavily hacked - Adds an MDS_DONE_WRITING rpc and handler - Adds client-side code for storing the MDS FID and epoch in the obdo's inline field in BRW_WRITE requests --- lustre/llite/llite_close.c | 263 ++++++++++++++++++++++++++++++++++++++++++ lustre/llite/llite_internal.h | 25 +++- lustre/mds/mds_internal.h | 1 + lustre/obdclass/obdo.c | 3 + 4 files changed, 288 insertions(+), 4 deletions(-) create mode 100644 lustre/llite/llite_close.c diff --git a/lustre/llite/llite_close.c b/lustre/llite/llite_close.c new file mode 100644 index 0000000..bf064c0 --- /dev/null +++ b/lustre/llite/llite_close.c @@ -0,0 +1,263 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Lustre Lite routines to issue a secondary close after writeback + * + * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include + +#define DEBUG_SUBSYSTEM S_LLITE + +#include +#include +#include "llite_internal.h" + +/* record that a write is in flight */ +void llap_write_pending(struct inode *inode, struct ll_async_page *llap) +{ + struct ll_inode_info *lli = ll_i2info(inode); + spin_lock(&lli->lli_lock); + list_add(&llap->llap_pending_write, &lli->lli_pending_write_llaps); + spin_unlock(&lli->lli_lock); +} + +/* record that a write has completed */ +void llap_write_complete(struct inode *inode, struct ll_async_page *llap) +{ + struct ll_inode_info *lli = ll_i2info(inode); + spin_lock(&lli->lli_lock); + if (!list_empty(&llap->llap_pending_write)) + list_del_init(&llap->llap_pending_write); + spin_unlock(&lli->lli_lock); +} + +void ll_open_complete(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + spin_lock(&lli->lli_lock); + lli->lli_send_done_writing = 0; + spin_unlock(&lli->lli_lock); +} + +/* if we close with writes in flight then we want the completion or cancelation + * of those writes to send a DONE_WRITING rpc to the MDS */ +int ll_is_inode_dirty(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + int rc = 0; + ENTRY; + + spin_lock(&lli->lli_lock); + if (!list_empty(&lli->lli_pending_write_llaps)) + rc = 1; + spin_unlock(&lli->lli_lock); + RETURN(rc); +} + +void ll_try_done_writing(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq; + + spin_lock(&lli->lli_lock); + + if (lli->lli_send_done_writing && + list_empty(&lli->lli_pending_write_llaps)) { + + spin_lock(&lcq->lcq_lock); + if (list_empty(&lli->lli_close_item)) { + CDEBUG(D_INODE, "adding inode %lu/%u to close list\n", + inode->i_ino, inode->i_generation); + LASSERT(igrab(inode) == inode); + list_add_tail(&lli->lli_close_item, &lcq->lcq_list); + wake_up(&lcq->lcq_waitq); + } + spin_unlock(&lcq->lcq_lock); + } + + spin_unlock(&lli->lli_lock); +} + +/* The MDS needs us to get the real file attributes, then send a DONE_WRITING */ +void ll_queue_done_writing(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + ENTRY; + + spin_lock(&lli->lli_lock); + lli->lli_send_done_writing = 1; + spin_unlock(&lli->lli_lock); + + ll_try_done_writing(inode); + EXIT; +} + +/* If we know the file size and have the cookies: + * - send a DONE_WRITING rpc + * + * Otherwise: + * - get a whole-file lock + * - get the authoritative size and all cookies with GETATTRs + * - send a DONE_WRITING rpc + */ +static void ll_close_done_writing(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ldlm_extent extent = { .start = 0, .end = OBD_OBJECT_EOF }; + struct lustre_handle lockh = { 0 }; + struct obdo obdo; + obd_flag valid; + int rc, ast_flags = 0; + ENTRY; + + memset(&obdo, 0, sizeof(obdo)); + if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags)) + goto rpc; + + rc = ll_extent_lock_no_validate(NULL, inode, lli->lli_smd, LCK_PW, + &extent, &lockh, ast_flags); + if (rc != ELDLM_OK) { + CERROR("lock acquisition failed (%d): unable to send " + "DONE_WRITING for inode %lu/%u\n", rc, inode->i_ino, + inode->i_generation); + GOTO(out, rc); + } + + rc = ll_lsm_getattr(ll_i2obdexp(inode), lli->lli_smd, &obdo); + if (rc) { + CERROR("inode_getattr failed (%d): unable to send DONE_WRITING " + "for inode %lu/%u\n", rc, inode->i_ino, + inode->i_generation); + ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh); + GOTO(out, rc); + } + + obdo_refresh_inode(inode, &obdo, valid); + + CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n", + lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks, + inode->i_blksize); + + set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags); + + rc = ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh); + if (rc != ELDLM_OK) + CERROR("unlock failed (%d)? proceeding anyways...\n", rc); + + rpc: + obdo.o_id = inode->i_ino; + obdo.o_size = inode->i_size; + obdo.o_blocks = inode->i_blocks; + obdo.o_valid = OBD_MD_FLID | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + + rc = mdc_done_writing(ll_i2sbi(inode)->ll_mdc_exp, &obdo); + out: + iput(inode); +} + +static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq) +{ + struct ll_inode_info *lli = NULL; + + spin_lock(&lcq->lcq_lock); + + if (lcq->lcq_list.next == NULL) + lli = ERR_PTR(-1); + else if (!list_empty(&lcq->lcq_list)) { + lli = list_entry(lcq->lcq_list.next, struct ll_inode_info, + lli_close_item); + list_del(&lli->lli_close_item); + } + + spin_unlock(&lcq->lcq_lock); + return lli; +} + +static int ll_close_thread(void *arg) +{ + struct ll_close_queue *lcq = arg; + ENTRY; + + /* XXX boiler-plate */ + { + char name[sizeof(current->comm)]; + unsigned long flags; + snprintf(name, sizeof(name) - 1, "ll_close"); + kportal_daemonize(name); + SIGNAL_MASK_LOCK(current, flags); + sigfillset(¤t->blocked); + RECALC_SIGPENDING; + SIGNAL_MASK_UNLOCK(current, flags); + } + + complete(&lcq->lcq_comp); + + while (1) { + struct l_wait_info lwi = { 0 }; + struct ll_inode_info *lli; + struct inode *inode; + + l_wait_event_exclusive(lcq->lcq_waitq, + (lli = ll_close_next_lli(lcq)) != NULL, + &lwi); + if (IS_ERR(lli)) + break; + + inode = ll_info2i(lli); + ll_close_done_writing(inode); + } + + complete(&lcq->lcq_comp); + RETURN(0); +} + +int ll_close_thread_start(struct ll_close_queue **lcq_ret) +{ + struct ll_close_queue *lcq; + pid_t pid; + + OBD_ALLOC(lcq, sizeof(*lcq)); + if (lcq == NULL) + return -ENOMEM; + + spin_lock_init(&lcq->lcq_lock); + INIT_LIST_HEAD(&lcq->lcq_list); + init_waitqueue_head(&lcq->lcq_waitq); + init_completion(&lcq->lcq_comp); + + pid = kernel_thread(ll_close_thread, lcq, 0); + if (pid < 0) { + OBD_FREE(lcq, sizeof(*lcq)); + return pid; + } + + wait_for_completion(&lcq->lcq_comp); + *lcq_ret = lcq; + return 0; +} + +void ll_close_thread_shutdown(struct ll_close_queue *lcq) +{ + init_completion(&lcq->lcq_comp); + lcq->lcq_list.next = NULL; + wake_up(&lcq->lcq_waitq); + wait_for_completion(&lcq->lcq_comp); + OBD_FREE(lcq, sizeof(*lcq)); +} diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index a1d8faa..14e04a7 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -93,6 +93,7 @@ struct ll_async_page { void *llap_cookie; int llap_queued; struct page *llap_page; + struct list_head llap_pending_write; }; #define LL_CDEBUG_PAGE(page, STR) \ @@ -122,10 +123,9 @@ void ll_prepare_mdc_op_data(struct mdc_op_data *, const char *name, int namelen, int mode); /* llite/rw.c */ -int ll_prepare_write(struct file *file, struct page *page, unsigned from, - unsigned to); -int ll_commit_write(struct file *file, struct page *page, unsigned from, - unsigned to); +int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); +int ll_commit_write(struct file *, struct page *, unsigned from, unsigned to); +void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa); #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) #define ll_ap_completion ll_ap_completion_24 void ll_ap_completion_24(void *data, int cmd, int rc); @@ -222,6 +222,23 @@ int ll_dentry_to_fh(struct dentry *, __u32 *datap, int *lenp, int need_parent); /* llite/symlink.c */ extern struct inode_operations ll_fast_symlink_inode_operations; +/* llite/llite_close.c */ +struct ll_close_queue { + spinlock_t lcq_lock; + struct list_head lcq_list; + wait_queue_head_t lcq_waitq; + struct completion lcq_comp; +}; + +void llap_write_pending(struct inode *inode, struct ll_async_page *llap); +void llap_write_complete(struct inode *inode, struct ll_async_page *llap); +void ll_open_complete(struct inode *inode); +int ll_is_inode_dirty(struct inode *inode); +void ll_try_done_writing(struct inode *inode); +void ll_queue_done_writing(struct inode *inode); +void ll_close_thread_shutdown(struct ll_close_queue *lcq); +int ll_close_thread_start(struct ll_close_queue **lcq_ret); + /* generic */ #define LL_SUPER_MAGIC 0x0BD00BD0 diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 96d7653..5dcd667 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -64,6 +64,7 @@ int mds_pin(struct ptlrpc_request *req); int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd, struct mds_file_data *mfd, int unlink_orphan); int mds_close(struct ptlrpc_request *req); +int mds_done_writing(struct ptlrpc_request *req); /* mds/mds_fs.c */ diff --git a/lustre/obdclass/obdo.c b/lustre/obdclass/obdo.c index 73ca0ee..aa604f8 100644 --- a/lustre/obdclass/obdo.c +++ b/lustre/obdclass/obdo.c @@ -35,6 +35,7 @@ #ifdef __KERNEL__ #include +#include /* for PAGE_CACHE_SIZE */ void obdo_from_iattr(struct obdo *oa, struct iattr *attr, unsigned int ia_valid) { @@ -220,6 +221,8 @@ void obdo_refresh_inode(struct inode *dst, struct obdo *src, obd_flag valid) /* optimum IO size */ if (valid & OBD_MD_FLBLKSZ && src->o_blksize > dst->i_blksize) dst->i_blksize = src->o_blksize; + if (dst->i_blksize < PAGE_CACHE_SIZE) + dst->i_blksize = PAGE_CACHE_SIZE; /* allocation of space */ if (valid & OBD_MD_FLBLOCKS && src->o_blocks > dst->i_blocks) dst->i_blocks = src->o_blocks; -- 1.8.3.1