Whamcloud - gitweb
- move the osc rpc managing thread into the osc and rename it appropriately
authorzab <zab>
Fri, 25 Jul 2003 22:13:12 +0000 (22:13 +0000)
committerzab <zab>
Fri, 25 Jul 2003 22:13:12 +0000 (22:13 +0000)
- osc_rpcd's set is internal to the osc instead of passed in
- evolve the temporary 'barrier' api into the proper 'plug/unplug'
- move the 'ocp' management helpers from llite into genops.c
- start and stop osc rpc thread based on the presence of osc imports
- give test_rbw/echo_client the ability to use plug/unplug
- mirror some of the b_filterio test_brw/echo_client changes
- add an osc_internal.h for the osc rpcd

lustre/llite/rw24.c
lustre/osc/osc_internal.h [new file with mode: 0644]
lustre/osc/osc_rpcd.c [new file with mode: 0644]

index e377928..d6099ec 100644 (file)
@@ -251,7 +251,7 @@ void ll_complete_writepage_24(struct obd_client_page *ocp, int rc)
                                   page->index, page->index);
         LASSERT(rc == 0);
 #endif
-        ll_ocp_free(page);
+        ocp_free(page);
 
         unlock_page(page);
 }
@@ -278,7 +278,7 @@ static int ll_writepage_24(struct page *page)
         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                     OBD_MD_FLMTIME | OBD_MD_FLCTIME);
 
-        ocp = ll_ocp_alloc(page);
+        ocp = ocp_alloc(page);
         if (IS_ERR(ocp)) 
                 GOTO(out, rc = PTR_ERR(ocp));
 
@@ -286,12 +286,8 @@ static int ll_writepage_24(struct page *page)
         ocp->ocp_flag = OBD_BRW_CREATE|OBD_BRW_FROM_GRANT;
 
         rc = obd_brw_async_ocp(OBD_BRW_WRITE, exp, &oa, 
-                               ll_i2info(inode)->lli_smd, ocp,
-                               ll_i2sbi(inode)->ll_lc.lc_set, NULL);
-        if (rc == 0)
-                rc = obd_brw_async_barrier(OBD_BRW_WRITE, exp, 
-                                           ll_i2info(inode)->lli_smd,
-                                           ll_i2sbi(inode)->ll_lc.lc_set);
+                               ll_i2info(inode)->lli_smd, ocp, NULL);
+
 out:
         class_export_put(exp);
         RETURN(rc);
diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h
new file mode 100644 (file)
index 0000000..c064f88
--- /dev/null
@@ -0,0 +1,18 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2003 Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ */
+
+#ifndef OSC_INTERNAL_H
+#define OSC_INTERNAL_H
+
+/* osc/osc_rpcd.c */
+int osc_rpcd_addref(void);
+int osc_rpcd_decref(void);
+void osc_rpcd_add_req(struct ptlrpc_request *req);
+
+#endif /* OSC_INTERNAL_H */
diff --git a/lustre/osc/osc_rpcd.c b/lustre/osc/osc_rpcd.c
new file mode 100644 (file)
index 0000000..f77e554
--- /dev/null
@@ -0,0 +1,228 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ *   Author Peter Braam <braam@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  For testing and management it is treated as an obd_device,
+ *  although * it does not export a full OBD method table (the
+ *  requests are coming * in over the wire, so object target modules
+ *  do not have a full * method table.)
+ *
+ */
+
+#define EXPORT_SYMTAB
+#define DEBUG_SUBSYSTEM S_OSC
+
+#ifdef __KERNEL__
+# include <linux/version.h>
+# include <linux/module.h>
+# include <linux/mm.h>
+# include <linux/highmem.h>
+# include <linux/lustre_dlm.h>
+# if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+#  include <linux/workqueue.h>
+#  include <linux/smp_lock.h>
+# else
+#  include <linux/locks.h>
+# endif
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <linux/kp30.h>
+#include <linux/lustre_mds.h> /* for mds_objid */
+#include <linux/lustre_net.h>
+#include <linux/lustre_otree.h>
+#include <linux/obd_ost.h>
+#include <linux/lustre_commit_confd.h>
+#include <linux/obd_lov.h>
+
+#ifndef  __CYGWIN__
+# include <linux/ctype.h>
+# include <linux/init.h>
+#else
+# include <ctype.h>
+#endif
+
+#include <linux/lustre_ha.h>
+#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
+#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
+#include <linux/lprocfs_status.h>
+
+#define LIOD_STOP 0
+static struct osc_rpcd_ctl {
+        unsigned long             orc_flags;
+        spinlock_t                orc_lock;
+        struct completion         orc_starting;
+        struct completion         orc_finishing;
+        struct list_head          orc_req_list;
+        wait_queue_head_t         orc_waitq;
+        struct ptlrpc_request_set *orc_set;
+} osc_orc;
+
+static DECLARE_MUTEX(osc_rpcd_sem);
+static int osc_rpcd_users = 0;
+
+void osc_rpcd_add_req(struct ptlrpc_request *req)
+{
+        struct osc_rpcd_ctl *orc = &osc_orc;
+
+        ptlrpc_set_add_new_req(orc->orc_set, req);
+        wake_up(&orc->orc_waitq);
+}
+
+static int osc_rpcd_check(struct osc_rpcd_ctl *orc)
+{
+        struct list_head *tmp, *pos;
+        struct ptlrpc_request *req;
+        unsigned long flags;
+        int rc = 0;
+        ENTRY;
+
+        if (test_bit(LIOD_STOP, &orc->orc_flags))
+                RETURN(1);
+
+        spin_lock_irqsave(&orc->orc_set->set_new_req_lock, flags);
+        list_for_each_safe(pos, tmp, &orc->orc_set->set_new_requests) {
+                req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
+                list_del_init(&req->rq_set_chain);
+                ptlrpc_set_add_req(orc->orc_set, req);
+        }
+        spin_unlock_irqrestore(&orc->orc_set->set_new_req_lock, flags);
+
+        if (orc->orc_set->set_remaining) {
+                rc = ptlrpc_check_set(orc->orc_set);
+
+                /* XXX our set never completes, so we prune the completed
+                 * reqs after each iteration. boy could this be smarter. */
+                list_for_each_safe(pos, tmp, &orc->orc_set->set_requests) {
+                        req = list_entry(pos, struct ptlrpc_request, 
+                                         rq_set_chain);
+                        if (req->rq_phase != RQ_PHASE_COMPLETE)
+                                continue;
+
+                        list_del_init(&req->rq_set_chain);
+                        req->rq_set = NULL;
+                        ptlrpc_req_finished (req);
+                }
+        }
+
+        RETURN(rc);
+}
+             
+/* ptlrpc's code paths like to execute in process context, so we have this
+ * thread which spins on a set which contains the io rpcs.  llite specifies
+ * osc_rpcd's set when it pushes pages down into the oscs */
+static int osc_rpcd(void *arg)
+{
+        struct osc_rpcd_ctl *orc = arg;
+        unsigned long flags;
+        ENTRY;
+
+        kportal_daemonize("liod_writeback");
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
+        sigfillset(&current->blocked);
+        recalc_sigpending();
+#else
+        spin_lock_irqsave(&current->sigmask_lock, flags);
+        sigfillset(&current->blocked);
+        recalc_sigpending(current);
+        spin_unlock_irqrestore(&current->sigmask_lock, flags);
+#endif
+
+        complete(&orc->orc_starting);
+
+        /* like kswapd */
+        current->flags |= PF_MEMALLOC;
+
+        /* this mainloop strongly resembles ptlrpc_set_wait except
+         * that our set never completes.  osc_rpcd_check calls ptlrpc_check_set
+         * when there are requests in the set.  new requests come in
+         * on the set's new_req_list and osc_rpcd_check moves them into
+         * the set. */
+        while (1) {
+                wait_queue_t set_wait;
+                struct l_wait_info lwi;
+                int timeout;
+
+                timeout = ptlrpc_set_next_timeout(orc->orc_set);
+                /* XXX the interrupted thing isn't really functional. */
+                lwi = LWI_TIMEOUT_INTR(timeout * HZ, ptlrpc_expired_set,
+                                       ptlrpc_interrupted_set, orc->orc_set);
+
+                /* ala the pinger, wait on orc's waitqueue and the set's */
+                init_waitqueue_entry(&set_wait, current);
+                add_wait_queue(&orc->orc_set->set_waitq, &set_wait);
+                l_wait_event(orc->orc_waitq, osc_rpcd_check(orc), &lwi);
+                remove_wait_queue(&orc->orc_set->set_waitq, &set_wait);
+
+                if (test_bit(LIOD_STOP, &orc->orc_flags))
+                        break;
+        }
+        /* XXX should be making sure we don't have anything in flight */
+        complete(&orc->orc_finishing);
+        return 0;
+}
+
+int osc_rpcd_addref(void)
+{
+        struct osc_rpcd_ctl *orc = &osc_orc;
+        int rc = 0;
+        ENTRY;
+
+        down(&osc_rpcd_sem);
+        if (++osc_rpcd_users != 1)
+                GOTO(out, rc);
+
+        memset(orc, 0, sizeof(*orc));
+        init_completion(&orc->orc_starting);
+        init_completion(&orc->orc_finishing);
+        init_waitqueue_head(&orc->orc_waitq);
+        orc->orc_flags = 0;
+        spin_lock_init(&orc->orc_lock);
+        INIT_LIST_HEAD(&orc->orc_req_list);
+
+        orc->orc_set = ptlrpc_prep_set();
+        if (orc->orc_set == NULL)
+                GOTO(out, rc = -ENOMEM);
+
+        if (kernel_thread(osc_rpcd, orc, 0) < 0)  {
+                ptlrpc_set_destroy(orc->orc_set);
+                GOTO(out, rc = -ECHILD);
+        }
+
+        wait_for_completion(&orc->orc_starting);
+out:
+        up(&osc_rpcd_sem);
+        RETURN(rc);
+}
+
+void osc_rpcd_decref(void)
+{
+        struct osc_rpcd_ctl *orc = &osc_orc;
+
+        down(&osc_rpcd_sem);
+        if (--osc_rpcd_users == 0) {
+                set_bit(LIOD_STOP, &orc->orc_flags);
+                wake_up(&orc->orc_waitq);
+                wait_for_completion(&orc->orc_finishing);
+                ptlrpc_set_destroy(orc->orc_set);
+        }
+        up(&osc_rpcd_sem);
+}