From: zab Date: Fri, 25 Jul 2003 22:13:12 +0000 (+0000) Subject: - move the osc rpc managing thread into the osc and rename it appropriately X-Git-Tag: v1_7_0_51~2^14~26 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=3f41ff62c9e2515946384ee3bbcb9e442c150444;p=fs%2Flustre-release.git - move the osc rpc managing thread into the osc and rename it appropriately - osc_rpcd's set is internal to the osc instead of passed in - evolve the temporary 'barrier' api into the proper 'plug/unplug' - move the 'ocp' management helpers from llite into genops.c - start and stop osc rpc thread based on the presence of osc imports - give test_rbw/echo_client the ability to use plug/unplug - mirror some of the b_filterio test_brw/echo_client changes - add an osc_internal.h for the osc rpcd --- diff --git a/lustre/llite/rw24.c b/lustre/llite/rw24.c index e377928..d6099ec 100644 --- a/lustre/llite/rw24.c +++ b/lustre/llite/rw24.c @@ -251,7 +251,7 @@ void ll_complete_writepage_24(struct obd_client_page *ocp, int rc) page->index, page->index); LASSERT(rc == 0); #endif - ll_ocp_free(page); + ocp_free(page); unlock_page(page); } @@ -278,7 +278,7 @@ static int ll_writepage_24(struct page *page) obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME); - ocp = ll_ocp_alloc(page); + ocp = ocp_alloc(page); if (IS_ERR(ocp)) GOTO(out, rc = PTR_ERR(ocp)); @@ -286,12 +286,8 @@ static int ll_writepage_24(struct page *page) ocp->ocp_flag = OBD_BRW_CREATE|OBD_BRW_FROM_GRANT; rc = obd_brw_async_ocp(OBD_BRW_WRITE, exp, &oa, - ll_i2info(inode)->lli_smd, ocp, - ll_i2sbi(inode)->ll_lc.lc_set, NULL); - if (rc == 0) - rc = obd_brw_async_barrier(OBD_BRW_WRITE, exp, - ll_i2info(inode)->lli_smd, - ll_i2sbi(inode)->ll_lc.lc_set); + ll_i2info(inode)->lli_smd, ocp, NULL); + out: class_export_put(exp); RETURN(rc); diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h new file mode 100644 index 0000000..c064f88 --- /dev/null +++ b/lustre/osc/osc_internal.h @@ -0,0 +1,18 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2003 Cluster File Systems, Inc. + * + * This code is issued under the GNU General Public License. + * See the file COPYING in this distribution + */ + +#ifndef OSC_INTERNAL_H +#define OSC_INTERNAL_H + +/* osc/osc_rpcd.c */ +int osc_rpcd_addref(void); +int osc_rpcd_decref(void); +void osc_rpcd_add_req(struct ptlrpc_request *req); + +#endif /* OSC_INTERNAL_H */ diff --git a/lustre/osc/osc_rpcd.c b/lustre/osc/osc_rpcd.c new file mode 100644 index 0000000..f77e554 --- /dev/null +++ b/lustre/osc/osc_rpcd.c @@ -0,0 +1,228 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * Author Peter Braam + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * For testing and management it is treated as an obd_device, + * although * it does not export a full OBD method table (the + * requests are coming * in over the wire, so object target modules + * do not have a full * method table.) + * + */ + +#define EXPORT_SYMTAB +#define DEBUG_SUBSYSTEM S_OSC + +#ifdef __KERNEL__ +# include +# include +# include +# include +# include +# if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) +# include +# include +# else +# include +# endif +#else /* __KERNEL__ */ +# include +#endif + +#include +#include /* for mds_objid */ +#include +#include +#include +#include +#include + +#ifndef __CYGWIN__ +# include +# include +#else +# include +#endif + +#include +#include /* for OBD_FAIL_CHECK */ +#include /* for PTL_MD_MAX_IOV */ +#include + +#define LIOD_STOP 0 +static struct osc_rpcd_ctl { + unsigned long orc_flags; + spinlock_t orc_lock; + struct completion orc_starting; + struct completion orc_finishing; + struct list_head orc_req_list; + wait_queue_head_t orc_waitq; + struct ptlrpc_request_set *orc_set; +} osc_orc; + +static DECLARE_MUTEX(osc_rpcd_sem); +static int osc_rpcd_users = 0; + +void osc_rpcd_add_req(struct ptlrpc_request *req) +{ + struct osc_rpcd_ctl *orc = &osc_orc; + + ptlrpc_set_add_new_req(orc->orc_set, req); + wake_up(&orc->orc_waitq); +} + +static int osc_rpcd_check(struct osc_rpcd_ctl *orc) +{ + struct list_head *tmp, *pos; + struct ptlrpc_request *req; + unsigned long flags; + int rc = 0; + ENTRY; + + if (test_bit(LIOD_STOP, &orc->orc_flags)) + RETURN(1); + + spin_lock_irqsave(&orc->orc_set->set_new_req_lock, flags); + list_for_each_safe(pos, tmp, &orc->orc_set->set_new_requests) { + req = list_entry(pos, struct ptlrpc_request, rq_set_chain); + list_del_init(&req->rq_set_chain); + ptlrpc_set_add_req(orc->orc_set, req); + } + spin_unlock_irqrestore(&orc->orc_set->set_new_req_lock, flags); + + if (orc->orc_set->set_remaining) { + rc = ptlrpc_check_set(orc->orc_set); + + /* XXX our set never completes, so we prune the completed + * reqs after each iteration. boy could this be smarter. */ + list_for_each_safe(pos, tmp, &orc->orc_set->set_requests) { + req = list_entry(pos, struct ptlrpc_request, + rq_set_chain); + if (req->rq_phase != RQ_PHASE_COMPLETE) + continue; + + list_del_init(&req->rq_set_chain); + req->rq_set = NULL; + ptlrpc_req_finished (req); + } + } + + RETURN(rc); +} + +/* ptlrpc's code paths like to execute in process context, so we have this + * thread which spins on a set which contains the io rpcs. llite specifies + * osc_rpcd's set when it pushes pages down into the oscs */ +static int osc_rpcd(void *arg) +{ + struct osc_rpcd_ctl *orc = arg; + unsigned long flags; + ENTRY; + + kportal_daemonize("liod_writeback"); +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0) + sigfillset(¤t->blocked); + recalc_sigpending(); +#else + spin_lock_irqsave(¤t->sigmask_lock, flags); + sigfillset(¤t->blocked); + recalc_sigpending(current); + spin_unlock_irqrestore(¤t->sigmask_lock, flags); +#endif + + complete(&orc->orc_starting); + + /* like kswapd */ + current->flags |= PF_MEMALLOC; + + /* this mainloop strongly resembles ptlrpc_set_wait except + * that our set never completes. osc_rpcd_check calls ptlrpc_check_set + * when there are requests in the set. new requests come in + * on the set's new_req_list and osc_rpcd_check moves them into + * the set. */ + while (1) { + wait_queue_t set_wait; + struct l_wait_info lwi; + int timeout; + + timeout = ptlrpc_set_next_timeout(orc->orc_set); + /* XXX the interrupted thing isn't really functional. */ + lwi = LWI_TIMEOUT_INTR(timeout * HZ, ptlrpc_expired_set, + ptlrpc_interrupted_set, orc->orc_set); + + /* ala the pinger, wait on orc's waitqueue and the set's */ + init_waitqueue_entry(&set_wait, current); + add_wait_queue(&orc->orc_set->set_waitq, &set_wait); + l_wait_event(orc->orc_waitq, osc_rpcd_check(orc), &lwi); + remove_wait_queue(&orc->orc_set->set_waitq, &set_wait); + + if (test_bit(LIOD_STOP, &orc->orc_flags)) + break; + } + /* XXX should be making sure we don't have anything in flight */ + complete(&orc->orc_finishing); + return 0; +} + +int osc_rpcd_addref(void) +{ + struct osc_rpcd_ctl *orc = &osc_orc; + int rc = 0; + ENTRY; + + down(&osc_rpcd_sem); + if (++osc_rpcd_users != 1) + GOTO(out, rc); + + memset(orc, 0, sizeof(*orc)); + init_completion(&orc->orc_starting); + init_completion(&orc->orc_finishing); + init_waitqueue_head(&orc->orc_waitq); + orc->orc_flags = 0; + spin_lock_init(&orc->orc_lock); + INIT_LIST_HEAD(&orc->orc_req_list); + + orc->orc_set = ptlrpc_prep_set(); + if (orc->orc_set == NULL) + GOTO(out, rc = -ENOMEM); + + if (kernel_thread(osc_rpcd, orc, 0) < 0) { + ptlrpc_set_destroy(orc->orc_set); + GOTO(out, rc = -ECHILD); + } + + wait_for_completion(&orc->orc_starting); +out: + up(&osc_rpcd_sem); + RETURN(rc); +} + +void osc_rpcd_decref(void) +{ + struct osc_rpcd_ctl *orc = &osc_orc; + + down(&osc_rpcd_sem); + if (--osc_rpcd_users == 0) { + set_bit(LIOD_STOP, &orc->orc_flags); + wake_up(&orc->orc_waitq); + wait_for_completion(&orc->orc_finishing); + ptlrpc_set_destroy(orc->orc_set); + } + up(&osc_rpcd_sem); +}