Whamcloud - gitweb
- New style waitqueues in ost and mds main
authorpschwan <pschwan>
Wed, 27 Feb 2002 05:41:04 +0000 (05:41 +0000)
committerpschwan <pschwan>
Wed, 27 Feb 2002 05:41:04 +0000 (05:41 +0000)
lustre/mds/handler.c
lustre/ost/ost_handler.c

index 5d7efd3..a6be005 100644 (file)
@@ -1,4 +1,6 @@
-/*
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
  *  linux/mds/handler.c
  *  
  *  Lustre Metadata Server (mds) request handler
@@ -74,7 +76,7 @@ static int mds_queue_req(struct ptlrpc_request *req)
 }
 
 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
-                   __u64 offset, struct niobuf *dst)
+                 __u64 offset, struct niobuf *dst)
 {
        int rc; 
        mm_segment_t oldfs = get_fs();
@@ -321,7 +323,7 @@ int mds_readpage(struct ptlrpc_request *req)
                req->rq_rephdr->status = PTR_ERR(file);
                return 0;
        }
-               
+
        niobuf = mds_req_tgt(req->rq_req.mds);
 
        /* to make this asynchronous make sure that the handling function 
@@ -427,6 +429,7 @@ int mds_main(void *arg)
 {
        struct mds_obd *mds = (struct mds_obd *) arg;
        struct timer_list timer;
+        DECLARE_WAITQUEUE(wait, current);
 
        lock_kernel();
        daemonize();
@@ -461,47 +464,96 @@ int mds_main(void *arg)
 
                if (mds->mds_service != NULL) {
                        ptl_event_t ev;
+                        struct ptlrpc_request request;
+                        struct ptlrpc_service *service;
 
-                       while (1) {
-                               struct ptlrpc_request request;
-                               struct ptlrpc_service *service;
 
+                        CDEBUG(D_IOCTL, "-- sleeping\n");
+                        add_wait_queue(&mds->mds_waitq, &wait);
+                        while (1) {
                                rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev);
-                               if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
-                                       break;
-                               
-                               service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;        
-
-                               /* FIXME: If we move to an event-driven model,
-                                * we should put the request on the stack of
-                                * mds_handle instead. */
-                               memset(&request, 0, sizeof(request));
-                               request.rq_reqbuf = ev.mem_desc.start +
-                                       ev.offset;
-                               request.rq_reqlen = ev.mem_desc.length;
-                               request.rq_obd = MDS;
-                               request.rq_xid = ev.match_bits;
-
-                               request.rq_peer.peer_nid = ev.initiator.nid;
-                               /* FIXME: this NI should be the incoming NI.
-                                * We don't know how to find that from here. */
-                               request.rq_peer.peer_ni =
-                                       mds->mds_service->srv_self.peer_ni;
-                               rc = mds_handle(&request);
-
-                               /* Inform the rpc layer the event has been handled */ 
-                               ptl_received_rpc(service);
-                       }
+                                if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
+                                        break;
+
+                                set_current_state(TASK_INTERRUPTIBLE);
+
+                                /* if this process really wants to die,
+                                 * let it go */
+                                if (sigismember(&(current->pending.signal),
+                                                SIGKILL) ||
+                                    sigismember(&(current->pending.signal),
+                                                SIGINT))
+                                        break;
+
+                                schedule();
+                        }
+                        remove_wait_queue(&mds->mds_waitq, &wait);
+                        set_current_state(TASK_RUNNING);
+                        CDEBUG(D_IOCTL, "-- done\n");
+
+                        if (rc == PTL_EQ_EMPTY) {
+                                /* We broke out because of a signal */
+                                EXIT;
+                                return -EINTR;
+                        }
+
+                        service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;       
+
+                        /* FIXME: If we move to an event-driven model,
+                         * we should put the request on the stack of
+                         * mds_handle instead. */
+                        memset(&request, 0, sizeof(request));
+                        request.rq_reqbuf = ev.mem_desc.start + ev.offset;
+                        request.rq_reqlen = ev.mem_desc.length;
+                        request.rq_obd = MDS;
+                        request.rq_xid = ev.match_bits;
+
+                        request.rq_peer.peer_nid = ev.initiator.nid;
+                        /* FIXME: this NI should be the incoming NI.
+                         * We don't know how to find that from here. */
+                        request.rq_peer.peer_ni =
+                                mds->mds_service->srv_self.peer_ni;
+                        rc = mds_handle(&request);
+
+                        /* Inform the rpc layer the event has been handled */ 
+                        ptl_received_rpc(service);
                } else {
                        struct ptlrpc_request *request;
 
+                        CDEBUG(D_IOCTL, "-- sleeping\n");
+                        add_wait_queue(&mds->mds_waitq, &wait);
+                        while (1) {
+                                spin_lock(&mds->mds_lock);
+                                if (!list_empty(&mds->mds_reqs))
+                                        break;
+
+                                set_current_state(TASK_INTERRUPTIBLE);
+
+                                /* if this process really wants to die,
+                                 * let it go */
+                                if (sigismember(&(current->pending.signal),
+                                                SIGKILL) ||
+                                    sigismember(&(current->pending.signal),
+                                                SIGINT))
+                                        break;
+
+                                spin_unlock(&mds->mds_lock);
+
+                                schedule();
+                        }
+                        remove_wait_queue(&mds->mds_waitq, &wait);
+                        set_current_state(TASK_RUNNING);
+                        CDEBUG(D_IOCTL, "-- done\n");
+
                        if (list_empty(&mds->mds_reqs)) {
-                               CDEBUG(D_INODE, "woke because of timer\n");
+                               CDEBUG(D_INODE, "woke because of signal\n");
+                                spin_unlock(&mds->mds_lock);
                        } else {
                                request = list_entry(mds->mds_reqs.next,
                                                     struct ptlrpc_request,
                                                     rq_list);
                                list_del(&request->rq_list);
+                                spin_unlock(&mds->mds_lock);
                                rc = mds_handle(request);
                        }
                }
index 1d4b93c..f665110 100644 (file)
@@ -1,22 +1,33 @@
-/*
- *  ost/ost_handler.c
- *  Storage Target Handling functions
- *  
- *  Lustre Object Server Module (OST)
- * 
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ *   Author: Peter J. Braam <braam@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
  *
- *  This code is issued under the GNU General Public License.
- *  See the file COPYING in this distribution
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  Storage Target Handling functions
+ *  Lustre Object Server Module (OST)
  *
- *  by Peter Braam <braam@clusterfs.com>
- * 
  *  This server is single threaded at present (but can easily be multi
  *  threaded). For testing and management it is treated as an
  *  obd_device, although it does not export a full OBD method table
  *  (the requests are coming in over the wire, so object target
  *  modules do not have a full method table.)
- * 
  */
 
 #define EXPORT_SYMTAB
@@ -70,7 +81,9 @@ static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
        /* remember where it came from */
        srv_req->rq_reply_handle = req;
 
+        spin_lock(&ost->ost_lock);
        list_add(&srv_req->rq_list, &ost->ost_reqs); 
+        spin_unlock(&ost->ost_lock);
        wake_up(&ost->ost_waitq);
        return 0;
 }
@@ -398,8 +411,8 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                goto out;
        }
 
-       if (cmd == OBD_BRW_WRITE) { 
-               for (i=0; i<niocount; i++) { 
+        if (cmd == OBD_BRW_WRITE) {
+                for (i = 0; i < niocount; i++) {
                        src = &((struct niobuf *)tmp2)[i];
                        dst = &((struct niobuf *)res)[i];
                        memcpy((void *)(unsigned long)dst->addr, 
@@ -408,7 +421,7 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                }
                barrier();
        } else { 
-               for (i=0; i<niocount; i++) { 
+                for (i = 0; i < niocount; i++) {
                        dst = &((struct niobuf *)tmp2)[i];
                        src = &((struct niobuf *)res)[i];
                        memcpy((void *)(unsigned long)dst->addr, 
@@ -509,10 +522,13 @@ out:
        return 0;
 }
 
+/* FIXME: Serious refactoring needed */
 int ost_main(void *arg)
 {
        struct obd_device *obddev = (struct obd_device *) arg;
        struct ost_obd *ost = &obddev->u.ost;
+        DECLARE_WAITQUEUE(wait, current);
+
        ENTRY;
 
        lock_kernel();
@@ -537,54 +553,97 @@ int ost_main(void *arg)
                if (ost->ost_flags & OST_EXIT)
                        break;
 
-               wake_up(&ost->ost_done_waitq);
-               interruptible_sleep_on(&ost->ost_waitq);
-               barrier();
-               CDEBUG(D_INODE, "ost_wakes: pick up req here and continue\n"); 
-
-
                if (ost->ost_service != NULL) {
                        ptl_event_t ev;
-
-                       while (1) {
-                               struct ptlrpc_request request;
-                               struct ptlrpc_service *service;
-                               rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
-                               if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
-                                       break;
-
-                               service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
-
-                               /* FIXME: If we move to an event-driven model,
-                                * we should put the request on the stack of
-                                * mds_handle instead. */
-                               memset(&request, 0, sizeof(request));
-                               request.rq_reqbuf = ev.mem_desc.start +
-                                       ev.offset;
-                               request.rq_reqlen = ev.mem_desc.length;
-                               request.rq_ost = ost;
-                               request.rq_xid = ev.match_bits;
-
-                               request.rq_peer.peer_nid = ev.initiator.nid;
-                               /* FIXME: this NI should be the incoming NI.
-                                * We don't know how to find that from here. */
-                               request.rq_peer.peer_ni =
-                                       ost->ost_service->srv_self.peer_ni;
-                               rc = ost_handle(obddev, &request);
-
-                               /* Inform the rpc layer the event has been handled */
-                                ptl_received_rpc(service);
-                       }
+                        struct ptlrpc_request request;
+                        struct ptlrpc_service *service;
+
+                        CDEBUG(D_IOCTL, "-- sleeping\n");
+                        add_wait_queue(&ost->ost_waitq, &wait);
+                        while (1) {
+                                rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
+                                if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
+                                        break;
+
+                                set_current_state(TASK_INTERRUPTIBLE);
+
+                                /* if this process really wants to die,
+                                 * let it go */
+                                if (sigismember(&(current->pending.signal),
+                                                SIGKILL) ||
+                                    sigismember(&(current->pending.signal),
+                                                SIGINT))
+                                        break;
+
+                                schedule();
+                        }
+                        remove_wait_queue(&ost->ost_waitq, &wait);
+                        set_current_state(TASK_RUNNING);
+                        CDEBUG(D_IOCTL, "-- done\n");
+
+                        if (rc == PTL_EQ_EMPTY) {
+                                /* We broke out because of a signal */
+                                EXIT;
+                                return -EINTR;
+                        }
+
+                        service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
+
+                        /* FIXME: If we move to an event-driven model,
+                         * we should put the request on the stack of
+                         * mds_handle instead. */
+                        memset(&request, 0, sizeof(request));
+                        request.rq_reqbuf = ev.mem_desc.start + ev.offset;
+                        request.rq_reqlen = ev.mem_desc.length;
+                        request.rq_ost = ost;
+                        request.rq_xid = ev.match_bits;
+
+                        request.rq_peer.peer_nid = ev.initiator.nid;
+                        /* FIXME: this NI should be the incoming NI.
+                         * We don't know how to find that from here. */
+                        request.rq_peer.peer_ni =
+                                ost->ost_service->srv_self.peer_ni;
+                        rc = ost_handle(obddev, &request);
+
+                        /* Inform the rpc layer the event has been handled */
+                        ptl_received_rpc(service);
                } else {
                        struct ptlrpc_request *request;
 
+                        CDEBUG(D_IOCTL, "-- sleeping\n");
+                        add_wait_queue(&ost->ost_waitq, &wait);
+                        while (1) {
+                                spin_lock(&ost->ost_lock);
+                                if (!list_empty(&ost->ost_reqs))
+                                        break;
+
+                                set_current_state(TASK_INTERRUPTIBLE);
+
+                                /* if this process really wants to die,
+                                 * let it go */
+                                if (sigismember(&(current->pending.signal),
+                                                SIGKILL) ||
+                                    sigismember(&(current->pending.signal),
+                                                SIGINT))
+                                        break;
+
+                                spin_unlock(&ost->ost_lock);
+
+                                schedule();
+                        }
+                        remove_wait_queue(&ost->ost_waitq, &wait);
+                        set_current_state(TASK_RUNNING);
+                        CDEBUG(D_IOCTL, "-- done\n");
+
                        if (list_empty(&ost->ost_reqs)) { 
-                               CDEBUG(D_INODE, "woke because of timer\n"); 
-                       } else { 
+                               CDEBUG(D_INODE, "woke because of signal\n");
+                                spin_unlock(&ost->ost_req_lock);
+                       } else {
                                request = list_entry(ost->ost_reqs.next,
                                                     struct ptlrpc_request,
                                                     rq_list);
                                list_del(&request->rq_list);
+                                spin_unlock(&ost->ost_req_lock);
                                rc = ost_handle(obddev, request); 
                        }
                }