From a24ce1bf0f86ba8693b2b32153f2517f2c285e01 Mon Sep 17 00:00:00 2001 From: pschwan Date: Wed, 27 Feb 2002 05:41:04 +0000 Subject: [PATCH] - New style waitqueues in ost and mds main --- lustre/mds/handler.c | 116 +++++++++++++++++++++++---------- lustre/ost/ost_handler.c | 163 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 195 insertions(+), 84 deletions(-) diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 5d7efd3..a6be005 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1,4 +1,6 @@ -/* +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * * linux/mds/handler.c * * Lustre Metadata Server (mds) request handler @@ -74,7 +76,7 @@ static int mds_queue_req(struct ptlrpc_request *req) } int mds_sendpage(struct ptlrpc_request *req, struct file *file, - __u64 offset, struct niobuf *dst) + __u64 offset, struct niobuf *dst) { int rc; mm_segment_t oldfs = get_fs(); @@ -321,7 +323,7 @@ int mds_readpage(struct ptlrpc_request *req) req->rq_rephdr->status = PTR_ERR(file); return 0; } - + niobuf = mds_req_tgt(req->rq_req.mds); /* to make this asynchronous make sure that the handling function @@ -427,6 +429,7 @@ int mds_main(void *arg) { struct mds_obd *mds = (struct mds_obd *) arg; struct timer_list timer; + DECLARE_WAITQUEUE(wait, current); lock_kernel(); daemonize(); @@ -461,47 +464,96 @@ int mds_main(void *arg) if (mds->mds_service != NULL) { ptl_event_t ev; + struct ptlrpc_request request; + struct ptlrpc_service *service; - while (1) { - struct ptlrpc_request request; - struct ptlrpc_service *service; + CDEBUG(D_IOCTL, "-- sleeping\n"); + add_wait_queue(&mds->mds_waitq, &wait); + while (1) { rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev); - if (rc != PTL_OK && rc != PTL_EQ_DROPPED) - break; - - service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; - - /* FIXME: If we move to an event-driven model, - * we should put the request on the stack of - * mds_handle instead. */ - memset(&request, 0, sizeof(request)); - request.rq_reqbuf = ev.mem_desc.start + - ev.offset; - request.rq_reqlen = ev.mem_desc.length; - request.rq_obd = MDS; - request.rq_xid = ev.match_bits; - - request.rq_peer.peer_nid = ev.initiator.nid; - /* FIXME: this NI should be the incoming NI. - * We don't know how to find that from here. */ - request.rq_peer.peer_ni = - mds->mds_service->srv_self.peer_ni; - rc = mds_handle(&request); - - /* Inform the rpc layer the event has been handled */ - ptl_received_rpc(service); - } + if (rc == PTL_OK || rc == PTL_EQ_DROPPED) + break; + + set_current_state(TASK_INTERRUPTIBLE); + + /* if this process really wants to die, + * let it go */ + if (sigismember(&(current->pending.signal), + SIGKILL) || + sigismember(&(current->pending.signal), + SIGINT)) + break; + + schedule(); + } + remove_wait_queue(&mds->mds_waitq, &wait); + set_current_state(TASK_RUNNING); + CDEBUG(D_IOCTL, "-- done\n"); + + if (rc == PTL_EQ_EMPTY) { + /* We broke out because of a signal */ + EXIT; + return -EINTR; + } + + service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; + + /* FIXME: If we move to an event-driven model, + * we should put the request on the stack of + * mds_handle instead. */ + memset(&request, 0, sizeof(request)); + request.rq_reqbuf = ev.mem_desc.start + ev.offset; + request.rq_reqlen = ev.mem_desc.length; + request.rq_obd = MDS; + request.rq_xid = ev.match_bits; + + request.rq_peer.peer_nid = ev.initiator.nid; + /* FIXME: this NI should be the incoming NI. + * We don't know how to find that from here. */ + request.rq_peer.peer_ni = + mds->mds_service->srv_self.peer_ni; + rc = mds_handle(&request); + + /* Inform the rpc layer the event has been handled */ + ptl_received_rpc(service); } else { struct ptlrpc_request *request; + CDEBUG(D_IOCTL, "-- sleeping\n"); + add_wait_queue(&mds->mds_waitq, &wait); + while (1) { + spin_lock(&mds->mds_lock); + if (!list_empty(&mds->mds_reqs)) + break; + + set_current_state(TASK_INTERRUPTIBLE); + + /* if this process really wants to die, + * let it go */ + if (sigismember(&(current->pending.signal), + SIGKILL) || + sigismember(&(current->pending.signal), + SIGINT)) + break; + + spin_unlock(&mds->mds_lock); + + schedule(); + } + remove_wait_queue(&mds->mds_waitq, &wait); + set_current_state(TASK_RUNNING); + CDEBUG(D_IOCTL, "-- done\n"); + if (list_empty(&mds->mds_reqs)) { - CDEBUG(D_INODE, "woke because of timer\n"); + CDEBUG(D_INODE, "woke because of signal\n"); + spin_unlock(&mds->mds_lock); } else { request = list_entry(mds->mds_reqs.next, struct ptlrpc_request, rq_list); list_del(&request->rq_list); + spin_unlock(&mds->mds_lock); rc = mds_handle(request); } } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 1d4b93c..f665110 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1,22 +1,33 @@ -/* - * ost/ost_handler.c - * Storage Target Handling functions - * - * Lustre Object Server Module (OST) - * +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. + * Author: Peter J. Braam + * Author: Phil Schwan + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. * - * This code is issued under the GNU General Public License. - * See the file COPYING in this distribution + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * Storage Target Handling functions + * Lustre Object Server Module (OST) * - * by Peter Braam - * * This server is single threaded at present (but can easily be multi * threaded). For testing and management it is treated as an * obd_device, although it does not export a full OBD method table * (the requests are coming in over the wire, so object target * modules do not have a full method table.) - * */ #define EXPORT_SYMTAB @@ -70,7 +81,9 @@ static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req) /* remember where it came from */ srv_req->rq_reply_handle = req; + spin_lock(&ost->ost_lock); list_add(&srv_req->rq_list, &ost->ost_reqs); + spin_unlock(&ost->ost_lock); wake_up(&ost->ost_waitq); return 0; } @@ -398,8 +411,8 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) goto out; } - if (cmd == OBD_BRW_WRITE) { - for (i=0; iaddr, @@ -408,7 +421,7 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) } barrier(); } else { - for (i=0; iaddr, @@ -509,10 +522,13 @@ out: return 0; } +/* FIXME: Serious refactoring needed */ int ost_main(void *arg) { struct obd_device *obddev = (struct obd_device *) arg; struct ost_obd *ost = &obddev->u.ost; + DECLARE_WAITQUEUE(wait, current); + ENTRY; lock_kernel(); @@ -537,54 +553,97 @@ int ost_main(void *arg) if (ost->ost_flags & OST_EXIT) break; - wake_up(&ost->ost_done_waitq); - interruptible_sleep_on(&ost->ost_waitq); - barrier(); - CDEBUG(D_INODE, "ost_wakes: pick up req here and continue\n"); - - if (ost->ost_service != NULL) { ptl_event_t ev; - - while (1) { - struct ptlrpc_request request; - struct ptlrpc_service *service; - rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev); - if (rc != PTL_OK && rc != PTL_EQ_DROPPED) - break; - - service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; - - /* FIXME: If we move to an event-driven model, - * we should put the request on the stack of - * mds_handle instead. */ - memset(&request, 0, sizeof(request)); - request.rq_reqbuf = ev.mem_desc.start + - ev.offset; - request.rq_reqlen = ev.mem_desc.length; - request.rq_ost = ost; - request.rq_xid = ev.match_bits; - - request.rq_peer.peer_nid = ev.initiator.nid; - /* FIXME: this NI should be the incoming NI. - * We don't know how to find that from here. */ - request.rq_peer.peer_ni = - ost->ost_service->srv_self.peer_ni; - rc = ost_handle(obddev, &request); - - /* Inform the rpc layer the event has been handled */ - ptl_received_rpc(service); - } + struct ptlrpc_request request; + struct ptlrpc_service *service; + + CDEBUG(D_IOCTL, "-- sleeping\n"); + add_wait_queue(&ost->ost_waitq, &wait); + while (1) { + rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev); + if (rc == PTL_OK || rc == PTL_EQ_DROPPED) + break; + + set_current_state(TASK_INTERRUPTIBLE); + + /* if this process really wants to die, + * let it go */ + if (sigismember(&(current->pending.signal), + SIGKILL) || + sigismember(&(current->pending.signal), + SIGINT)) + break; + + schedule(); + } + remove_wait_queue(&ost->ost_waitq, &wait); + set_current_state(TASK_RUNNING); + CDEBUG(D_IOCTL, "-- done\n"); + + if (rc == PTL_EQ_EMPTY) { + /* We broke out because of a signal */ + EXIT; + return -EINTR; + } + + service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; + + /* FIXME: If we move to an event-driven model, + * we should put the request on the stack of + * mds_handle instead. */ + memset(&request, 0, sizeof(request)); + request.rq_reqbuf = ev.mem_desc.start + ev.offset; + request.rq_reqlen = ev.mem_desc.length; + request.rq_ost = ost; + request.rq_xid = ev.match_bits; + + request.rq_peer.peer_nid = ev.initiator.nid; + /* FIXME: this NI should be the incoming NI. + * We don't know how to find that from here. */ + request.rq_peer.peer_ni = + ost->ost_service->srv_self.peer_ni; + rc = ost_handle(obddev, &request); + + /* Inform the rpc layer the event has been handled */ + ptl_received_rpc(service); } else { struct ptlrpc_request *request; + CDEBUG(D_IOCTL, "-- sleeping\n"); + add_wait_queue(&ost->ost_waitq, &wait); + while (1) { + spin_lock(&ost->ost_lock); + if (!list_empty(&ost->ost_reqs)) + break; + + set_current_state(TASK_INTERRUPTIBLE); + + /* if this process really wants to die, + * let it go */ + if (sigismember(&(current->pending.signal), + SIGKILL) || + sigismember(&(current->pending.signal), + SIGINT)) + break; + + spin_unlock(&ost->ost_lock); + + schedule(); + } + remove_wait_queue(&ost->ost_waitq, &wait); + set_current_state(TASK_RUNNING); + CDEBUG(D_IOCTL, "-- done\n"); + if (list_empty(&ost->ost_reqs)) { - CDEBUG(D_INODE, "woke because of timer\n"); - } else { + CDEBUG(D_INODE, "woke because of signal\n"); + spin_unlock(&ost->ost_req_lock); + } else { request = list_entry(ost->ost_reqs.next, struct ptlrpc_request, rq_list); list_del(&request->rq_list); + spin_unlock(&ost->ost_req_lock); rc = ost_handle(obddev, request); } } -- 1.8.3.1