Whamcloud - gitweb
Remove no-longer-needed inode operations (they previously had extN EA VFS
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index 51468c8..67ca61a 100644 (file)
-/*
- *  linux/mds/handler.c
- *  
- *  Lustre Object Server Module (OST)
- * 
- *  Copyright (C) 2001  Cluster File Systems, Inc.
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ *   Author: Peter J. Braam <braam@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
  *
- *  This code is issued under the GNU General Public License.
- *  See the file COPYING in this distribution
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  Storage Target Handling functions
+ *  Lustre Object Server Module (OST)
  *
- *  by Peter Braam <braam@clusterfs.com>
- * 
- *  This server is single threaded at present (but can easily be multi threaded). 
- *  For testing and management it is treated as an obd_device, although it does
- *  not export a full OBD method table (the requests are coming in over the wire, 
- *  so object target modules do not have a full method table.)
- * 
+ *  This server is single threaded at present (but can easily be multi
+ *  threaded). For testing and management it is treated as an
+ *  obd_device, although it does not export a full OBD method table
+ *  (the requests are coming in over the wire, so object target
+ *  modules do not have a full method table.)
  */
 
-
 #define EXPORT_SYMTAB
+#define DEBUG_SUBSYSTEM S_OST
 
-#include <linux/version.h>
 #include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/ext2_fs.h>
-#include <linux/quotaops.h>
-#include <asm/unistd.h>
-#include <linux/obd_support.h>
-#include <linux/obd.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
-#include <linux/lustre_mds.h>
-#include <linux/obd_class.h>
-
-// for testing
-static int ost_queue_req(struct obd_device *obddev, struct ost_request *req)
+#include <linux/obd_ost.h>
+#include <linux/lustre_net.h>
+
+static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
 {
-       struct ost_request *srv_req; 
-       struct ost_obd *ost = &obddev->u.ost;
-       
-       if (!ost) { 
-               EXIT;
-               return -1;
-       }
-
-       srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL); 
-       if (!srv_req) { 
-               EXIT;
-               return -ENOMEM;
-       }
-
-       printk("---> OST at %d %p, incoming req %p, srv_req %p\n", 
-              __LINE__, ost, req, srv_req);
-
-       memset(srv_req, 0, sizeof(*req)); 
-
-       /* move the request buffer */
-       srv_req->rq_reqbuf = req->rq_reqbuf;
-       srv_req->rq_reqlen    = req->rq_reqlen;
-       srv_req->rq_obd = ost;
-
-       /* remember where it came from */
-       srv_req->rq_reply_handle = req;
-
-       list_add(&srv_req->rq_list, &ost->ost_reqs); 
-       wake_up(&ost->ost_waitq);
-       return 0;
-}
+        struct obd_conn conn;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
 
-/* XXX replace with networking code */
-int ost_reply(struct obd_device *obddev, struct ost_request *req)
-{
-       struct ost_request *clnt_req = req->rq_reply_handle;
-
-       ENTRY;
-       printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req); 
-
-       /* free the request buffer */
-       kfree(req->rq_reqbuf);
-       req->rq_reqbuf = NULL; 
-       
-       /* move the reply to the client */ 
-       clnt_req->rq_replen = req->rq_replen;
-       clnt_req->rq_repbuf = req->rq_repbuf;
-
-       printk("---> client req %p repbuf %p len %d status %d\n", 
-              clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen, 
-              req->rq_rephdr->status); 
-
-       req->rq_repbuf = NULL;
-       req->rq_replen = 0;
-       
-       /* free the server request */
-       kfree(req); 
-       /* wake up the client */ 
-       wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
-       EXIT;
-       return 0;
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        req->rq_status = obd_destroy(&conn, &body->oa);
+        RETURN(0);
 }
 
-int ost_error(struct obd_device *obddev, struct ost_request *req)
+static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
 {
-       struct ost_rep_hdr *hdr;
-
-       ENTRY;
-       hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
-       if (!hdr) { 
-               EXIT;
-               return -ENOMEM;
-       }
-
-       memset(hdr, 0, sizeof(*hdr));
-       
-       hdr->seqno = req->rq_reqhdr->seqno;
-       hdr->status = req->rq_status; 
-       hdr->type = OST_TYPE_ERR;
-
-       req->rq_repbuf = (char *)hdr;
-       req->rq_replen = sizeof(*hdr); 
-
-       EXIT;
-       return ost_reply(obddev, req);
+        struct obd_conn conn;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_getattr(&conn, &repbody->oa);
+        RETURN(0);
 }
 
-static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
+static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
-
-       ENTRY;
-       
-       conn.oc_id = req->rq_req->connid;
-       conn.oc_dev = ost->ost_tgt;
-
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_destroy: cannot pack reply\n"); 
-               return rc;
-       }
-
-       req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
-               (&conn, &req->rq_req->oa); 
-
-       EXIT;
-       return 0;
+        struct obd_conn conn;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_open(&conn, &repbody->oa);
+        RETURN(0);
 }
 
-static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
+static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
-
-       ENTRY;
-       printk("ost getattr entered\n"); 
-       
-       conn.oc_id = req->rq_req->connid;
-       conn.oc_dev = ost->ost_tgt;
-
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_getattr: cannot pack reply\n"); 
-               return rc;
-       }
-       req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
-       req->rq_rep->oa.o_valid = req->rq_req->oa.o_valid;
-
-       req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
-               (&conn, &req->rq_rep->oa); 
-
-       EXIT;
-       return 0;
+        struct obd_conn conn;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_close(&conn, &repbody->oa);
+        RETURN(0);
 }
 
-static int ost_create(struct ost_obd *ost, struct ost_request *req)
+static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
+        struct obd_conn conn;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       ENTRY;
-       
-       conn.oc_id = req->rq_req->connid;
-       conn.oc_dev = ost->ost_tgt;
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
 
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_create: cannot pack reply\n"); 
-               return rc;
-       }
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
 
-       memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_create(&conn, &repbody->oa);
+        RETURN(0);
+}
 
-       req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
-               (&conn, &req->rq_rep->oa); 
+static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
+{
+        struct obd_conn conn;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       EXIT;
-       return 0;
-}
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
 
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
 
-static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_punch(&conn, &repbody->oa,
+                                   repbody->oa.o_size, repbody->oa.o_blocks);
+        RETURN(0);
+}
+
+static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
+        struct obd_conn conn;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       ENTRY;
-       
-       conn.oc_id = req->rq_req->connid;
-       conn.oc_dev = ost->ost_tgt;
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_setattr(&conn, &repbody->oa);
+        RETURN(0);
+}
+
+static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
+{
+        struct obd_conn conn;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
-               return rc;
-       }
+        conn.oc_dev = ost->ost_tgt;
 
-       memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
 
-       req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
-               (&conn, &req->rq_rep->oa); 
+        req->rq_status = obd_connect(&conn);
 
-       EXIT;
-       return 0;
+        CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
+        body = lustre_msg_buf(req->rq_repmsg, 0);
+        body->connid = conn.oc_id;
+        RETURN(0);
 }
 
-static int ost_connect(struct ost_obd *ost, struct ost_request *req)
+static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
-
-       ENTRY;
-       
-       conn.oc_dev = ost->ost_tgt;
-
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
-               return rc;
-       }
-
-       req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
-
-       printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, 
-              conn.oc_id);
-       req->rq_rep->connid = conn.oc_id;
-       EXIT;
-       return 0;
-}
+        struct obd_conn conn;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
 
-static int ost_disconnect(struct ost_obd *ost, struct ost_request *req)
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
+        req->rq_status = obd_disconnect(&conn);
+        RETURN(0);
+}
+
+static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
+        struct obd_conn conn;
+        struct ost_body *body;
+        int rc, size[2] = {sizeof(*body)};
+        char *bufs[2] = {NULL, NULL}, *ptr;
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        conn.oc_id = body->connid;
+        conn.oc_dev = ost->ost_tgt;
 
-       ENTRY;
-       
-       conn.oc_dev = ost->ost_tgt;
-       conn.oc_id = req->rq_req->connid;
+        ptr = lustre_msg_buf(req->rq_reqmsg, 1);
+        if (!ptr)
+                RETURN(-EINVAL);
 
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
-               return rc;
-       }
+        req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
+                                      &(size[1]), (void **)&(bufs[1]));
 
-       req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
+        rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                CERROR("cannot pack reply\n");
 
-       EXIT;
-       return 0;
+        RETURN(rc);
 }
 
-static int ost_get_info(struct ost_obd *ost, struct ost_request *req)
+static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
-       int vallen;
-       void *val;
+        struct ptlrpc_bulk_desc *desc;
+        struct obd_conn conn;
+        void *tmp1, *tmp2, *end2;
+        struct niobuf_remote *remote_nb;
+        struct niobuf_local *local_nb = NULL;
+        struct obd_ioobj *ioo;
+        struct ost_body *body;
+        int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+        objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+        niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+        cmd = body->data;
+
+        conn.oc_id = body->connid;
+        conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+        for (i = 0; i < objcount; i++) {
+                ost_unpack_ioo(&tmp1, &ioo);
+                if (tmp2 + ioo->ioo_bufcnt > end2) {
+                        LBUG();
+                        GOTO(out, rc = -EFAULT);
+                }
+                for (j = 0; j < ioo->ioo_bufcnt; j++)
+                        ost_unpack_niobuf(&tmp2, &remote_nb);
+        }
 
-       ENTRY;
-       
-       conn.oc_id = req->rq_req->connid;
-       conn.oc_dev = ost->ost_tgt;
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+        OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
+        if (local_nb == NULL)
+                RETURN(-ENOMEM);
+
+        /* The unpackers move tmp1 and tmp2, so reset them before using */
+        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        req->rq_status = obd_preprw(cmd, &conn, objcount,
+                                    tmp1, niocount, tmp2, local_nb);
+
+        if (req->rq_status)
+                GOTO(out_local, 0);
+
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(out_local, rc = -ENOMEM);
+        desc->b_portal = OST_BULK_PORTAL;
+
+        for (i = 0; i < niocount; i++) {
+                struct ptlrpc_bulk_page *bulk;
+                bulk = ptlrpc_prep_bulk_page(desc);
+                if (bulk == NULL)
+                        GOTO(out_bulk, rc = -ENOMEM);
+                remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
+                bulk->b_xid = remote_nb->xid;
+                bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
+                bulk->b_buflen = PAGE_SIZE;
+        }
+
+        rc = ptlrpc_send_bulk(desc);
+        if (rc)
+                GOTO(out_bulk, rc);
 
-       req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
-               (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val); 
+        ptlrpc_free_bulk(desc);
 
+        /* The unpackers move tmp1 and tmp2, so reset them before using */
+        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        req->rq_status = obd_commitrw(cmd, &conn, objcount,
+                                      tmp1, niocount, local_nb);
 
-       rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
-               return rc;
-       }
+        RETURN(rc);
 
-       EXIT;
-       return 0;
+ out_bulk:
+        ptlrpc_free_bulk(desc);
+ out_local:
+        if (local_nb != NULL)
+                OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+ out:
+        return 0;
 }
 
+static int ost_commit_page(struct obd_conn *conn, struct page *page)
+{
+        struct obd_ioobj obj;
+        struct niobuf_local buf;
+        int rc;
+        ENTRY;
 
+        memset(&buf, 0, sizeof(buf));
+        memset(&obj, 0, sizeof(obj));
 
-//int ost_handle(struct ost_conn *conn, int len, char *buf)
-int ost_handle(struct obd_device *obddev, struct ost_request *req)
+        buf.page = page;
+        obj.ioo_bufcnt = 1;
+
+        rc = obd_commitrw(OBD_BRW_WRITE, conn, 1, &obj, 1, &buf);
+        RETURN(rc);
+}
+
+static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
 {
-       int rc;
-       struct ost_obd *ost = &obddev->u.ost;
-       struct ost_req_hdr *hdr;
-
-       ENTRY;
-       printk("ost_handle: req at %p\n", req); 
-
-       hdr = (struct ost_req_hdr *)req->rq_reqbuf;
-       if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
-               printk("lustre_ost: wrong packet type sent %d\n",
-                      NTOH__u32(hdr->type));
-               rc = -EINVAL;
-               goto out;
-       }
-
-       rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
-                           &req->rq_reqhdr, &req->rq_req);
-       if (rc) { 
-               printk("lustre_ost: Invalid request\n");
-               EXIT; 
-               goto out;
-       }
-
-       switch (req->rq_reqhdr->opc) { 
-
-       case OST_CONNECT:
-               CDEBUG(D_INODE, "connect\n");
-               printk("----> connect \n"); 
-               rc = ost_connect(ost, req);
-               break;
-       case OST_DISCONNECT:
-               CDEBUG(D_INODE, "disconnect\n");
-               rc = ost_disconnect(ost, req);
-               break;
-       case OST_GET_INFO:
-               CDEBUG(D_INODE, "get_info\n");
-               rc = ost_get_info(ost, req);
-               break;
-       case OST_CREATE:
-               CDEBUG(D_INODE, "create\n");
-               rc = ost_create(ost, req);
-               break;
-       case OST_DESTROY:
-               CDEBUG(D_INODE, "destroy\n");
-               rc = ost_destroy(ost, req);
-               break;
-       case OST_GETATTR:
-               CDEBUG(D_INODE, "getattr\n");
-               rc = ost_getattr(ost, req);
-               break;
-       case OST_SETATTR:
-               CDEBUG(D_INODE, "setattr\n");
-               rc = ost_setattr(ost, req);
-               break;
-
-       default:
-               return ost_error(obddev, req);
-       }
+        void *journal_save;
+        int rc;
+        ENTRY;
 
-out:
-       req->rq_rephdr->status = rc;
-       if (rc) { 
-               printk("ost: processing error %d\n", rc);
-               ost_error(obddev, req);
-       } else { 
-               CDEBUG(D_INODE, "sending reply\n"); 
-               ost_reply(obddev, req); 
-       }
-
-       return 0;
+        /* Restore the filesystem journal context when we do the commit.
+         * This is needed for ext3 and reiserfs, but can't really hurt
+         * other filesystems.
+         */
+        journal_save = current->journal_info;
+        current->journal_info = bulk->b_desc->b_journal_info;
+        CDEBUG(D_BUFFS, "journal_info: saved %p->%p, restored %p\n", current,
+               journal_save, bulk->b_desc->b_journal_info);
+        rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page);
+        current->journal_info = journal_save;
+        CDEBUG(D_BUFFS, "journal_info: restored %p->%p\n", current,
+               journal_save);
+        if (rc)
+                CERROR("ost_commit_page failed: %d\n", rc);
+
+        RETURN(rc);
 }
 
-int ost_main(void *arg)
+static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
 {
-       struct obd_device *obddev = (struct obd_device *) arg;
-       struct ost_obd *ost = &obddev->u.ost;
-       ENTRY;
-       printk("---> %d\n", __LINE__);
-
-
-       lock_kernel();
-       printk("---> %d\n", __LINE__);
-       daemonize();
-       printk("---> %d\n", __LINE__);
-       spin_lock_irq(&current->sigmask_lock);
-       printk("---> %d\n", __LINE__);
-       sigfillset(&current->blocked);
-       printk("---> %d\n", __LINE__);
-       recalc_sigpending(current);
-       printk("---> %d\n", __LINE__);
-       spin_unlock_irq(&current->sigmask_lock);
-       printk("---> %d\n", __LINE__);
-
-       printk("---> %d\n", __LINE__);
-       sprintf(current->comm, "lustre_ost");
-       printk("---> %d\n", __LINE__);
-
-       /* Record that the  thread is running */
-       ost->ost_thread = current;
-       printk("---> %d\n", __LINE__);
-       wake_up(&ost->ost_done_waitq); 
-       printk("---> %d\n", __LINE__);
-
-       /* XXX maintain a list of all managed devices: insert here */
-
-       /* And now, wait forever for commit wakeup events. */
-       while (1) {
-               struct ost_request *request;
-               int rc; 
-
-               if (ost->ost_flags & OST_EXIT)
-                       break;
-
-
-               wake_up(&ost->ost_done_waitq);
-               interruptible_sleep_on(&ost->ost_waitq);
-
-               CDEBUG(D_INODE, "lustre_ost wakes\n");
-               CDEBUG(D_INODE, "pick up req here and continue\n"); 
-
-               if (list_empty(&ost->ost_reqs)) { 
-                       CDEBUG(D_INODE, "woke because of timer\n"); 
-               } else { 
-                       printk("---> %d\n", __LINE__);
-                       request = list_entry(ost->ost_reqs.next, 
-                                            struct ost_request, rq_list);
-                       printk("---> %d\n", __LINE__);
-                       list_del(&request->rq_list);
-                       rc = ost_handle(obddev, request); 
-               }
-       }
-
-       /* XXX maintain a list of all managed devices: cleanup here */
-       printk("---> %d\n", __LINE__);
-       ost->ost_thread = NULL;
-       printk("---> %d\n", __LINE__);
-       wake_up(&ost->ost_done_waitq);
-       printk("lustre_ost: exiting\n");
-       return 0;
+        ptlrpc_free_bulk(desc);
+
+        return 0;
 }
 
-static void ost_stop_srv_thread(struct ost_obd *ost)
+static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
-       ost->ost_flags |= OST_EXIT;
+        struct ptlrpc_bulk_desc *desc;
+        struct obd_conn conn;
+        struct niobuf_remote *remote_nb;
+        struct niobuf_local *local_nb, *lnb;
+        struct obd_ioobj *ioo;
+        struct ost_body *body;
+        int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
+        void *tmp1, *tmp2, *end2;
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+        objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+        niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+        cmd = body->data;
+
+        conn.oc_id = body->connid;
+        conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+        for (i = 0; i < objcount; i++) {
+                ost_unpack_ioo((void *)&tmp1, &ioo);
+                if (tmp2 + ioo->ioo_bufcnt > end2) {
+                        rc = -EFAULT;
+                        break;
+                }
+                for (j = 0; j < ioo->ioo_bufcnt; j++)
+                        ost_unpack_niobuf((void *)&tmp2, &remote_nb);
+        }
+
+        size[1] = niocount * sizeof(*remote_nb);
+        rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                GOTO(fail, rc);
+        remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
+
+        OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+        if (local_nb == NULL)
+                GOTO(fail, rc = -ENOMEM);
+
+        /* The unpackers move tmp1 and tmp2, so reset them before using */
+        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        req->rq_status = obd_preprw(cmd, &conn, objcount,
+                                    tmp1, niocount, tmp2, local_nb);
+        if (req->rq_status)
+                GOTO(success, 0);
+
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(fail_preprw, rc = -ENOMEM);
+        desc->b_cb = ost_brw_write_finished_cb;
+        desc->b_portal = OSC_BULK_PORTAL;
+        memcpy(&(desc->b_conn), &conn, sizeof(conn));
+
+        /* Save journal context for commit callbacks */
+        CDEBUG(D_BUFFS, "journal_info: saved %p->%p\n", current,
+               current->journal_info);
+        desc->b_journal_info = current->journal_info;
+
+        for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
+                struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+                struct ptlrpc_bulk_page *bulk;
+
+                bulk = ptlrpc_prep_bulk_page(desc);
+                if (bulk == NULL)
+                        GOTO(fail_bulk, rc = -ENOMEM);
+
+                spin_lock(&srv->srv_lock);
+                bulk->b_xid = srv->srv_xid++;
+                spin_unlock(&srv->srv_lock);
+
+                bulk->b_buf = (void *)(unsigned long)lnb->addr;
+                bulk->b_page = lnb->page;
+                bulk->b_buflen = PAGE_SIZE;
+                bulk->b_cb = ost_brw_write_cb;
+
+                /* this advances remote_nb */
+                ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
+                                bulk->b_xid);
+        }
+
+        rc = ptlrpc_register_bulk(desc);
+        current->journal_info = NULL; /* kind of scary */
+        if (rc)
+                GOTO(fail_bulk, rc);
 
-       while (ost->ost_thread) {
-               wake_up(&ost->ost_waitq);
-               sleep_on(&ost->ost_done_waitq);
-       }
+        EXIT;
+ success:
+        OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+        return 0;
+
+ fail_bulk:
+        ptlrpc_free_bulk(desc);
+ fail_preprw:
+        OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+        /* FIXME: how do we undo the preprw? */
+ fail:
+        return rc;
 }
 
-static void ost_start_srv_thread(struct obd_device *obd)
+static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
-       struct ost_obd *ost = &obd->u.ost;
-       ENTRY;
-
-       init_waitqueue_head(&ost->ost_waitq);
-       printk("---> %d\n", __LINE__);
-       init_waitqueue_head(&ost->ost_done_waitq);
-       printk("---> %d\n", __LINE__);
-       kernel_thread(ost_main, (void *)obd, 
-                     CLONE_VM | CLONE_FS | CLONE_FILES);
-       printk("---> %d\n", __LINE__);
-       while (!ost->ost_thread) 
-               sleep_on(&ost->ost_done_waitq);
-       printk("---> %d\n", __LINE__);
-       EXIT;
+        struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        if (body->data == OBD_BRW_READ)
+                return ost_brw_read(obddev, req);
+        else
+                return ost_brw_write(obddev, req);
+}
+
+static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
+                      struct ptlrpc_request *req)
+{
+        int rc;
+        struct ost_obd *ost = &obddev->u.ost;
+        ENTRY;
+
+        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
+                CERROR("lustre_mds: Invalid request\n");
+                GOTO(out, rc);
+        }
+
+        if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
+                CERROR("lustre_mds: wrong packet type sent %d\n",
+                       req->rq_reqmsg->type);
+                GOTO(out, rc = -EINVAL);
+        }
+
+        switch (req->rq_reqmsg->opc) {
+        case OST_CONNECT:
+                CDEBUG(D_INODE, "connect\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
+                rc = ost_connect(ost, req);
+                break;
+        case OST_DISCONNECT:
+                CDEBUG(D_INODE, "disconnect\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
+                rc = ost_disconnect(ost, req);
+                break;
+        case OST_GET_INFO:
+                CDEBUG(D_INODE, "get_info\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
+                rc = ost_get_info(ost, req);
+                break;
+        case OST_CREATE:
+                CDEBUG(D_INODE, "create\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
+                rc = ost_create(ost, req);
+                break;
+        case OST_DESTROY:
+                CDEBUG(D_INODE, "destroy\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
+                rc = ost_destroy(ost, req);
+                break;
+        case OST_GETATTR:
+                CDEBUG(D_INODE, "getattr\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
+                rc = ost_getattr(ost, req);
+                break;
+        case OST_SETATTR:
+                CDEBUG(D_INODE, "setattr\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
+                rc = ost_setattr(ost, req);
+                break;
+        case OST_OPEN:
+                CDEBUG(D_INODE, "setattr\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
+                rc = ost_open(ost, req);
+                break;
+        case OST_CLOSE:
+                CDEBUG(D_INODE, "setattr\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
+                rc = ost_close(ost, req);
+                break;
+        case OST_BRW:
+                CDEBUG(D_INODE, "brw\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
+                rc = ost_brw(ost, req);
+                break;
+        case OST_PUNCH:
+                CDEBUG(D_INODE, "punch\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
+                rc = ost_punch(ost, req);
+                break;
+        default:
+                req->rq_status = -ENOTSUPP;
+                rc = ptlrpc_error(svc, req);
+                RETURN(rc);
+        }
+
+        EXIT;
+out:
+        //req->rq_status = rc;
+        if (rc) {
+                CERROR("ost: processing error %d\n", rc);
+                ptlrpc_error(svc, req);
+        } else {
+                CDEBUG(D_INODE, "sending reply\n");
+                ptlrpc_reply(svc, req);
+        }
+
+        return 0;
 }
 
 /* mount the file system (secretly) */
-static int ost_setup(struct obd_device *obddev, obd_count len,
-                       void *buf)
-                       
+static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
-       struct obd_ioctl_data* data = buf;
-       struct ost_obd *ost = &obddev->u.ost;
-       struct obd_device *tgt;
-       int err; 
+        struct obd_ioctl_data* data = buf;
+        struct ost_obd *ost = &obddev->u.ost;
+        struct obd_device *tgt;
+        int err;
         ENTRY;
 
-       if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
-               EXIT;
-               return -ENODEV;
-       }
+        if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
+                RETURN(-ENODEV);
 
+        MOD_INC_USE_COUNT;
         tgt = &obd_dev[data->ioc_dev];
-       ost->ost_tgt = tgt;
-        if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
-             ! (tgt->obd_flags & OBD_SET_UP) ){
-                printk("device not attached or not set up (%d)\n", 
+        ost->ost_tgt = tgt;
+        if (!(tgt->obd_flags & OBD_ATTACHED) ||
+            !(tgt->obd_flags & OBD_SET_UP)) {
+                CERROR("device not attached or not set up (%d)\n",
                        data->ioc_dev);
-                EXIT;
-               return -EINVAL;
-        } 
+                GOTO(error_dec, err = -EINVAL);
+        }
 
-       ost->ost_conn.oc_dev = tgt;
-       err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
-       if (err) { 
-               printk("lustre ost: fail to connect to device %d\n", 
-                      data->ioc_dev); 
-               return -EINVAL;
-       }
+        ost->ost_conn.oc_dev = tgt;
+        err = obd_connect(&ost->ost_conn);
+        if (err) {
+                CERROR("fail to connect to device %d\n", data->ioc_dev);
+                GOTO(error_dec, err = -EINVAL);
+        }
 
-       INIT_LIST_HEAD(&ost->ost_reqs);
-       ost->ost_thread = NULL;
-       ost->ost_flags = 0;
+        ost->ost_service = ptlrpc_init_svc(128 * 1024,
+                                           OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
+                                           "self", ost_handle);
+        if (!ost->ost_service) {
+                CERROR("failed to start service\n");
+                GOTO(error_disc, err = -EINVAL);
+        }
 
-       spin_lock_init(&obddev->u.ost.fo_lock);
+        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
+        if (err)
+                GOTO(error_disc, err = -EINVAL);
+        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
+        if (err)
+                GOTO(error_disc, err = -EINVAL);
 
-       ost_start_srv_thread(obddev);
+        RETURN(0);
 
-        MOD_INC_USE_COUNT;
-        EXIT; 
-        return 0;
-} 
+error_disc:
+        obd_disconnect(&ost->ost_conn);
+error_dec:
+        MOD_DEC_USE_COUNT;
+        RETURN(err);
+}
 
 static int ost_cleanup(struct obd_device * obddev)
 {
-       struct ost_obd *ost = &obddev->u.ost;
-       struct obd_device *tgt;
-       int err;
+        struct ost_obd *ost = &obddev->u.ost;
+        int err;
 
         ENTRY;
 
-        if ( !(obddev->obd_flags & OBD_SET_UP) ) {
-                EXIT;
-                return 0;
-        }
-
         if ( !list_empty(&obddev->obd_gen_clients) ) {
-                printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
-                EXIT;
-                return -EBUSY;
+                CERROR("still has clients!\n");
+                RETURN(-EBUSY);
         }
 
-       ost_stop_srv_thread(ost);
+        ptlrpc_stop_all_threads(ost->ost_service);
+        rpc_unregister_service(ost->ost_service);
 
-       if (!list_empty(&ost->ost_reqs)) {
-               // XXX reply with errors and clean up
-               CDEBUG(D_INODE, "Request list not empty!\n");
-       }
+        if (!list_empty(&ost->ost_service->srv_reqs)) {
+                // XXX reply with errors and clean up
+                CERROR("Request list not empty!\n");
+        }
+        OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
 
-       tgt = ost->ost_tgt;
-       err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
-       if (err) { 
-               printk("lustre ost: fail to disconnect device\n");
-               return -EINVAL;
-       }
-       
+        err = obd_disconnect(&ost->ost_conn);
+        if (err) {
+                CERROR("lustre ost: fail to disconnect device\n");
+                RETURN(-EINVAL);
+        }
 
         MOD_DEC_USE_COUNT;
-        EXIT;
-        return 0;
+        RETURN(0);
 }
 
 /* use obd ops to offer management infrastructure */
@@ -576,20 +688,17 @@ static struct obd_ops ost_obd_ops = {
 static int __init ost_init(void)
 {
         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
-       return 0;
+        return 0;
 }
 
 static void __exit ost_exit(void)
 {
-       obd_unregister_type(LUSTRE_OST_NAME);
+        obd_unregister_type(LUSTRE_OST_NAME);
 }
 
 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
 MODULE_LICENSE("GPL");
 
-// for testing (maybe this stays)
-EXPORT_SYMBOL(ost_queue_req);
-
 module_init(ost_init);
 module_exit(ost_exit);