Whamcloud - gitweb
- added rq_type field to ptlrpc_request
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index 060c5c3..e45c6f9 100644 (file)
@@ -1,25 +1,35 @@
-/*
- *  ost/ost_handler.c
- *  Storage Target Handling functions
- *  
- *  Lustre Object Server Module (OST)
- * 
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ *   Author: Peter J. Braam <braam@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
  *
- *  This code is issued under the GNU General Public License.
- *  See the file COPYING in this distribution
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  Storage Target Handling functions
+ *  Lustre Object Server Module (OST)
  *
- *  by Peter Braam <braam@clusterfs.com>
- * 
  *  This server is single threaded at present (but can easily be multi
  *  threaded). For testing and management it is treated as an
  *  obd_device, although it does not export a full OBD method table
  *  (the requests are coming in over the wire, so object target
  *  modules do not have a full method table.)
- * 
  */
 
-
 #define EXPORT_SYMTAB
 
 #include <linux/version.h>
@@ -30,6 +40,9 @@
 #include <linux/ext2_fs.h>
 #include <linux/quotaops.h>
 #include <asm/unistd.h>
+
+#define DEBUG_SUBSYSTEM S_OST
+
 #include <linux/obd_support.h>
 #include <linux/obd.h>
 #include <linux/obd_class.h>
@@ -55,7 +68,7 @@ static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
                return -ENOMEM;
        }
 
-       printk("---> OST at %d %p, incoming req %p, srv_req %p\n", 
+        CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
               __LINE__, ost, req, srv_req);
 
        memset(srv_req, 0, sizeof(*req)); 
@@ -68,7 +81,9 @@ static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
        /* remember where it came from */
        srv_req->rq_reply_handle = req;
 
+        spin_lock(&ost->ost_lock);
        list_add(&srv_req->rq_list, &ost->ost_reqs); 
+        spin_unlock(&ost->ost_lock);
        wake_up(&ost->ost_waitq);
        return 0;
 }
@@ -83,7 +98,8 @@ int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
                /* This is a request that came from the network via portals. */
 
                /* FIXME: we need to increment the count of handled events */
-               ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
+                req->rq_type = PTLRPC_REPLY;
+               ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL);
        } else {
                /* This is a local request that came from another thread. */
 
@@ -143,7 +159,7 @@ static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
        rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
                          &req->rq_replen, &req->rq_repbuf); 
        if (rc) { 
-               printk("ost_destroy: cannot pack reply\n"); 
+               CERROR("cannot pack reply\n"); 
                return rc;
        }
 
@@ -160,7 +176,6 @@ static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
        int rc;
 
        ENTRY;
-       printk("ost getattr entered\n"); 
        
        conn.oc_id = req->rq_req.ost->connid;
        conn.oc_dev = ost->ost_tgt;
@@ -168,7 +183,7 @@ static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
        rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
                          &req->rq_replen, &req->rq_repbuf); 
        if (rc) { 
-               printk("ost_getattr: cannot pack reply\n"); 
+               CERROR("cannot pack reply\n"); 
                return rc;
        }
        req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
@@ -194,7 +209,7 @@ static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
        rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
                          &req->rq_replen, &req->rq_repbuf); 
        if (rc) { 
-               printk("ost_create: cannot pack reply\n"); 
+               CERROR("cannot pack reply\n"); 
                return rc;
        }
 
@@ -207,6 +222,34 @@ static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
        return 0;
 }
 
+static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
+{
+       struct obd_conn conn; 
+       int rc;
+
+       ENTRY;
+       
+       conn.oc_id = req->rq_req.ost->connid;
+       conn.oc_dev = ost->ost_tgt;
+
+       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
+                         &req->rq_replen, &req->rq_repbuf); 
+       if (rc) { 
+               CERROR("cannot pack reply\n"); 
+               return rc;
+       }
+
+       memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
+
+       req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_punch
+               (&conn, &req->rq_rep.ost->oa, 
+                req->rq_rep.ost->oa.o_size,
+                req->rq_rep.ost->oa.o_blocks); 
+
+       EXIT;
+       return 0;
+}
+
 
 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
 {
@@ -221,7 +264,7 @@ static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
        rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
                          &req->rq_replen, &req->rq_repbuf); 
        if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
+               CERROR("cannot pack reply\n"); 
                return rc;
        }
 
@@ -247,13 +290,13 @@ static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
        rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
                          &req->rq_replen, &req->rq_repbuf); 
        if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
+               CERROR("cannot pack reply\n"); 
                return rc;
        }
 
        req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
 
-       printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, 
+        CDEBUG(0, "rep buffer %p, id %d\n", req->rq_repbuf,
               conn.oc_id);
        req->rq_rep.ost->connid = conn.oc_id;
        EXIT;
@@ -273,7 +316,7 @@ static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
        rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
                          &req->rq_replen, &req->rq_repbuf); 
        if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
+               CERROR("cannot pack reply\n"); 
                return rc;
        }
 
@@ -300,10 +343,10 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
        req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
                (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); 
 
-       rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
+       rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
+                          &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); 
        if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
+               CERROR("cannot pack reply\n"); 
                return rc;
        }
 
@@ -340,7 +383,7 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                          &req->rq_rephdr, &req->rq_rep.ost,
                          &req->rq_replen, &req->rq_repbuf); 
        if (rc) { 
-               printk("ost_create: cannot pack reply\n"); 
+               CERROR("cannot pack reply\n"); 
                return rc;
        }
        res = ost_rep_buf1(req->rq_rep.ost); 
@@ -364,22 +407,29 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
                 niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
 
-       if (cmd == OBD_BRW_WRITE) { 
-               for (i=0; i<niocount; i++) { 
+       if (req->rq_rep.ost->result) {
+               EXIT;
+               goto out;
+       }
+
+        if (cmd == OBD_BRW_WRITE) {
+                for (i = 0; i < niocount; i++) {
                        src = &((struct niobuf *)tmp2)[i];
                        dst = &((struct niobuf *)res)[i];
                        memcpy((void *)(unsigned long)dst->addr, 
                               (void *)(unsigned long)src->addr, 
                               src->len);
                }
+               barrier();
        } else { 
-               for (i=0; i<niocount; i++) { 
+                for (i = 0; i < niocount; i++) {
                        dst = &((struct niobuf *)tmp2)[i];
                        src = &((struct niobuf *)res)[i];
                        memcpy((void *)(unsigned long)dst->addr, 
                               (void *)(unsigned long)src->addr, 
                               PAGE_SIZE); 
                }
+               barrier();
        }
 
        req->rq_rep.ost->result = 
@@ -387,6 +437,7 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
                 niocount, (struct niobuf *)res); 
 
+ out:
        EXIT;
        return 0;
 }
@@ -398,11 +449,11 @@ int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
        struct ptlreq_hdr *hdr;
 
        ENTRY;
-       printk("ost_handle: req at %p\n", req); 
+        CDEBUG(0, "req at %p\n", req);
 
        hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
        if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
-               printk("lustre_ost: wrong packet type sent %d\n",
+               CERROR("lustre_ost: wrong packet type sent %d\n",
                       NTOH__u32(hdr->type));
                rc = -EINVAL;
                goto out;
@@ -411,7 +462,7 @@ int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
        rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
                            &req->rq_reqhdr, &req->rq_req.ost);
        if (rc) { 
-               printk("lustre_ost: Invalid request\n");
+               CERROR("lustre_ost: Invalid request\n");
                EXIT; 
                goto out;
        }
@@ -450,6 +501,10 @@ int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
                CDEBUG(D_INODE, "brw\n");
                rc = ost_brw(ost, req);
                break;
+       case OST_PUNCH:
+               CDEBUG(D_INODE, "punch\n");
+               rc = ost_punch(ost, req);
+               break;
        default:
                req->rq_status = -ENOTSUPP;
                return ost_error(obddev, req);
@@ -458,7 +513,7 @@ int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
 out:
        req->rq_status = rc;
        if (rc) { 
-               printk("ost: processing error %d\n", rc);
+               CERROR("ost: processing error %d\n", rc);
                ost_error(obddev, req);
        } else { 
                CDEBUG(D_INODE, "sending reply\n"); 
@@ -468,10 +523,14 @@ out:
        return 0;
 }
 
+/* FIXME: Serious refactoring needed */
 int ost_main(void *arg)
 {
+        int signal; 
        struct obd_device *obddev = (struct obd_device *) arg;
        struct ost_obd *ost = &obddev->u.ost;
+        DECLARE_WAITQUEUE(wait, current);
+
        ENTRY;
 
        lock_kernel();
@@ -493,59 +552,106 @@ int ost_main(void *arg)
        while (1) {
                int rc; 
 
-               if (ost->ost_flags & OST_EXIT)
-                       break;
+               if (ost->ost_service != NULL) {
+                       ptl_event_t ev;
+                        struct ptlrpc_request request;
+                        struct ptlrpc_service *service;
+
+                        CDEBUG(D_IOCTL, "-- sleeping\n");
+                        signal = 0;
+                        add_wait_queue(&ost->ost_waitq, &wait);
+                        while (1) {
+                                set_current_state(TASK_INTERRUPTIBLE);
+                                rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
+                                if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
+                                        break;
+                                if (ost->ost_flags & OST_EXIT)
+                                        break;
+
+
+                                /* if this process really wants to die,
+                                 * let it go */
+                                if (sigismember(&(current->pending.signal),
+                                                SIGKILL) ||
+                                    sigismember(&(current->pending.signal),
+                                                SIGINT)) {
+                                        signal = 1;
+                                        break;
+                                }
+
+                                schedule();
+                        }
+                        remove_wait_queue(&ost->ost_waitq, &wait);
+                        set_current_state(TASK_RUNNING);
+                        CDEBUG(D_IOCTL, "-- done\n");
+
+                        if (signal == 1) {
+                                /* We broke out because of a signal */
+                                EXIT;
+                                break;
+                        }
+                        if (ost->ost_flags & OST_EXIT) {
+                                EXIT;
+                                break;
+                        }
+
+                        service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
+
+                        /* FIXME: If we move to an event-driven model,
+                         * we should put the request on the stack of
+                         * mds_handle instead. */
+                        memset(&request, 0, sizeof(request));
+                        request.rq_reqbuf = ev.mem_desc.start + ev.offset;
+                        request.rq_reqlen = ev.mem_desc.length;
+                        request.rq_ost = ost;
+                        request.rq_xid = ev.match_bits;
+
+                        request.rq_peer.peer_nid = ev.initiator.nid;
+                        /* FIXME: this NI should be the incoming NI.
+                         * We don't know how to find that from here. */
+                        request.rq_peer.peer_ni =
+                                ost->ost_service->srv_self.peer_ni;
+                        rc = ost_handle(obddev, &request);
+
+                        /* Inform the rpc layer the event has been handled */
+                        ptl_received_rpc(service);
+               } else {
+                       struct ptlrpc_request *request;
 
-               wake_up(&ost->ost_done_waitq);
-               interruptible_sleep_on(&ost->ost_waitq);
+                        CDEBUG(D_IOCTL, "-- sleeping\n");
+                        add_wait_queue(&ost->ost_waitq, &wait);
+                        while (1) {
+                                spin_lock(&ost->ost_lock);
+                                if (!list_empty(&ost->ost_reqs))
+                                        break;
 
-               CDEBUG(D_INODE, "lustre_ost wakes\n");
-               CDEBUG(D_INODE, "pick up req here and continue\n"); 
+                                set_current_state(TASK_INTERRUPTIBLE);
 
+                                /* if this process really wants to die,
+                                 * let it go */
+                                if (sigismember(&(current->pending.signal),
+                                                SIGKILL) ||
+                                    sigismember(&(current->pending.signal),
+                                                SIGINT))
+                                        break;
 
-               if (ost->ost_service != NULL) {
-                       ptl_event_t ev;
+                                spin_unlock(&ost->ost_lock);
 
-                       while (1) {
-                               struct ptlrpc_request request;
-                               struct ptlrpc_service *service;
-
-                               rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
-                               if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
-                                       break;
-
-                               service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
-
-                               /* FIXME: If we move to an event-driven model,
-                                * we should put the request on the stack of
-                                * mds_handle instead. */
-                               memset(&request, 0, sizeof(request));
-                               request.rq_reqbuf = ev.mem_desc.start +
-                                       ev.offset;
-                               request.rq_reqlen = ev.mem_desc.length;
-                               request.rq_ost = ost;
-                               request.rq_xid = ev.match_bits;
-
-                               request.rq_peer.peer_nid = ev.initiator.nid;
-                               /* FIXME: this NI should be the incoming NI.
-                                * We don't know how to find that from here. */
-                               request.rq_peer.peer_ni =
-                                       ost->ost_service->srv_self.peer_ni;
-                               rc = ost_handle(obddev, &request);
-
-                               /* Inform the rpc layer the event has been handled */
-                                ptl_received_rpc(service);
-                       }
-               } else {
-                       struct ptlrpc_request *request;
+                                schedule();
+                        }
+                        remove_wait_queue(&ost->ost_waitq, &wait);
+                        set_current_state(TASK_RUNNING);
+                        CDEBUG(D_IOCTL, "-- done\n");
 
                        if (list_empty(&ost->ost_reqs)) { 
-                               CDEBUG(D_INODE, "woke because of timer\n"); 
-                       } else { 
+                               CDEBUG(D_INODE, "woke because of signal\n");
+                                spin_unlock(&ost->ost_lock);
+                       } else {
                                request = list_entry(ost->ost_reqs.next,
                                                     struct ptlrpc_request,
                                                     rq_list);
                                list_del(&request->rq_list);
+                                spin_unlock(&ost->ost_lock);
                                rc = ost_handle(obddev, request); 
                        }
                }
@@ -555,7 +661,7 @@ int ost_main(void *arg)
 
        ost->ost_thread = NULL;
        wake_up(&ost->ost_done_waitq);
-       printk("lustre_ost: exiting\n");
+       CERROR("lustre_ost: exiting\n");
        return 0;
 }
 
@@ -604,7 +710,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len,
        ost->ost_tgt = tgt;
         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
              ! (tgt->obd_flags & OBD_SET_UP) ){
-                printk("device not attached or not set up (%d)\n", 
+                CERROR("device not attached or not set up (%d)\n", 
                        data->ioc_dev);
                 EXIT;
                return -EINVAL;
@@ -613,7 +719,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len,
        ost->ost_conn.oc_dev = tgt;
        err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
        if (err) { 
-               printk("lustre ost: fail to connect to device %d\n", 
+               CERROR("lustre ost: fail to connect to device %d\n", 
                       data->ioc_dev); 
                return -EINVAL;
        }
@@ -658,7 +764,7 @@ static int ost_cleanup(struct obd_device * obddev)
         }
 
         if ( !list_empty(&obddev->obd_gen_clients) ) {
-                printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
+                CERROR("still has clients!\n");
                 EXIT;
                 return -EBUSY;
         }
@@ -675,7 +781,7 @@ static int ost_cleanup(struct obd_device * obddev)
        tgt = ost->ost_tgt;
        err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
        if (err) { 
-               printk("lustre ost: fail to disconnect device\n");
+               CERROR("lustre ost: fail to disconnect device\n");
                return -EINVAL;
        }