Whamcloud - gitweb
Lproc-snmp code drop
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index f8c1317..9282752 100644 (file)
-/*
- *  ost/ost_handler.c
- *  Storage Target Handling functions
- *  
- *  Lustre Object Server Module (OST)
- * 
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ *   Author: Peter J. Braam <braam@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
  *
- *  This code is issued under the GNU General Public License.
- *  See the file COPYING in this distribution
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  Storage Target Handling functions
+ *  Lustre Object Server Module (OST)
  *
- *  by Peter Braam <braam@clusterfs.com>
- * 
  *  This server is single threaded at present (but can easily be multi
  *  threaded). For testing and management it is treated as an
  *  obd_device, although it does not export a full OBD method table
  *  (the requests are coming in over the wire, so object target
  *  modules do not have a full method table.)
- * 
  */
 
 #define EXPORT_SYMTAB
+#define DEBUG_SUBSYSTEM S_OST
 
-#include <linux/version.h>
 #include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/ext2_fs.h>
-#include <linux/quotaops.h>
-#include <asm/unistd.h>
-
-#define DEBUG_SUBSYSTEM S_OST
+#include <linux/obd_ost.h>
+#include <linux/lustre_net.h>
+#include <linux/lustre_dlm.h>
+#include <linux/init.h>
+#include <linux/lprocfs_status.h>
 
-#include <linux/obd_support.h>
-#include <linux/obd.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
-#include <linux/lustre_mds.h>
-#include <linux/obd_class.h>
+extern lprocfs_vars_t status_var_nm_1[];
+extern lprocfs_vars_t status_class_var[];
 
-// for testing
-static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
+static int ost_destroy(struct ptlrpc_request *req)
 {
-       struct ptlrpc_request *srv_req; 
-       struct ost_obd *ost = &obddev->u.ost;
-       
-       if (!ost) { 
-               EXIT;
-               return -1;
-       }
-
-       OBD_ALLOC(srv_req, sizeof(*srv_req));
-       if (!srv_req) { 
-               EXIT;
-               return -ENOMEM;
-       }
-
-        CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
-              __LINE__, ost, req, srv_req);
-
-       memset(srv_req, 0, sizeof(*req)); 
-
-       /* move the request buffer */
-       srv_req->rq_reqbuf = req->rq_reqbuf;
-       srv_req->rq_reqlen    = req->rq_reqlen;
-       srv_req->rq_ost = ost;
-
-       /* remember where it came from */
-       srv_req->rq_reply_handle = req;
-
-       list_add(&srv_req->rq_list, &ost->ost_reqs); 
-       wake_up(&ost->ost_waitq);
-       return 0;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        req->rq_status = obd_destroy(conn, &body->oa, NULL);
+        RETURN(0);
 }
 
-int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
+static int ost_getattr(struct ptlrpc_request *req)
 {
-       struct ptlrpc_request *clnt_req = req->rq_reply_handle;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       ENTRY;
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
 
-       if (req->rq_ost->ost_service != NULL) {
-               /* This is a request that came from the network via portals. */
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
 
-               /* FIXME: we need to increment the count of handled events */
-               ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
-       } else {
-               /* This is a local request that came from another thread. */
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
+        RETURN(0);
+}
 
-               /* move the reply to the client */ 
-               clnt_req->rq_replen = req->rq_replen;
-               clnt_req->rq_repbuf = req->rq_repbuf;
-               req->rq_repbuf = NULL;
-               req->rq_replen = 0;
+static int ost_statfs(struct ptlrpc_request *req)
+{
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct obd_statfs *osfs;
+        int rc, size = sizeof(*osfs);
+        ENTRY;
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
 
-               /* free the request buffer */
-               OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
-               req->rq_reqbuf = NULL;
+        osfs = lustre_msg_buf(req->rq_repmsg, 0);
+        memset(osfs, 0, size);
 
-               /* wake up the client */ 
-               wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
-       }
+        rc = obd_statfs(conn, osfs);
+        if (rc) {
+                CERROR("ost: statfs failed: rc %d\n", rc);
+                req->rq_status = rc;
+                RETURN(rc);
+        }
+        obd_statfs_pack(osfs, osfs);
 
-       EXIT;
-       return 0;
+        RETURN(0);
 }
 
-int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
+static int ost_open(struct ptlrpc_request *req)
 {
-       struct ptlrep_hdr *hdr;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       ENTRY;
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
 
-       OBD_ALLOC(hdr, sizeof(*hdr));
-       if (!hdr) { 
-               EXIT;
-               return -ENOMEM;
-       }
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
 
-       memset(hdr, 0, sizeof(*hdr));
-       
-       hdr->seqno = req->rq_reqhdr->seqno;
-       hdr->status = req->rq_status; 
-       hdr->type = OST_TYPE_ERR;
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_open(conn, &repbody->oa, NULL);
+        RETURN(0);
+}
 
-       req->rq_repbuf = (char *)hdr;
-       req->rq_replen = sizeof(*hdr); 
+static int ost_close(struct ptlrpc_request *req)
+{
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       EXIT;
-       return ost_reply(obddev, req);
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_close(conn, &repbody->oa, NULL);
+        RETURN(0);
 }
 
-static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_create(struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
-
-       ENTRY;
-       
-       conn.oc_id = req->rq_req.ost->connid;
-       conn.oc_dev = ost->ost_tgt;
-
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_destroy: cannot pack reply\n"); 
-               return rc;
-       }
-
-       req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
-               (&conn, &req->rq_req.ost->oa); 
-
-       EXIT;
-       return 0;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_create(conn, &repbody->oa, NULL);
+        RETURN(0);
 }
 
-static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_punch(struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
-
-       ENTRY;
-       
-       conn.oc_id = req->rq_req.ost->connid;
-       conn.oc_dev = ost->ost_tgt;
-
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_getattr: cannot pack reply\n"); 
-               return rc;
-       }
-       req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
-       req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
-
-       req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
-               (&conn, &req->rq_rep.ost->oa); 
-
-       EXIT;
-       return 0;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
+            (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
+                RETURN(-EINVAL);
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_punch(conn, &repbody->oa, NULL,
+                                   repbody->oa.o_size, repbody->oa.o_blocks);
+        RETURN(0);
 }
 
-static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_setattr(struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       ENTRY;
-       
-       conn.oc_id = req->rq_req.ost->connid;
-       conn.oc_dev = ost->ost_tgt;
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
 
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_create: cannot pack reply\n"); 
-               return rc;
-       }
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
 
-       memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
+        RETURN(0);
+}
 
-       req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
-               (&conn, &req->rq_rep.ost->oa); 
+static int ost_bulk_timeout(void *data)
+{
+        struct ptlrpc_bulk_desc *desc = data;
 
-       EXIT;
-       return 0;
+        ENTRY;
+        CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+        RETURN(1);
 }
 
-
-static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_brw_read(struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ptlrpc_bulk_desc *desc;
+        void *tmp1, *tmp2, *end2;
+        struct niobuf_remote *remote_nb;
+        struct niobuf_local *local_nb = NULL;
+        struct obd_ioobj *ioo;
+        struct ost_body *body;
+        struct l_wait_info lwi;
+        void *desc_priv = NULL;
+        int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
+        ENTRY;
 
-       ENTRY;
-       
-       conn.oc_id = req->rq_req.ost->connid;
-       conn.oc_dev = ost->ost_tgt;
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+        objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+        niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+        cmd = OBD_BRW_READ;
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
+                GOTO(out, rc = 0);
+
+        for (i = 0; i < objcount; i++) {
+                ost_unpack_ioo(&tmp1, &ioo);
+                if (tmp2 + ioo->ioo_bufcnt > end2) {
+                        LBUG();
+                        GOTO(out, rc = -EFAULT);
+                }
+                for (j = 0; j < ioo->ioo_bufcnt; j++)
+                        ost_unpack_niobuf(&tmp2, &remote_nb);
+        }
 
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
-               return rc;
-       }
+        OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
+        if (local_nb == NULL)
+                GOTO(out, rc = -ENOMEM);
 
-       memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
-              sizeof(req->rq_req.ost->oa));
+        /* The unpackers move tmp1 and tmp2, so reset them before using */
+        ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+        remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+        req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
+                                    remote_nb, local_nb, &desc_priv);
 
-       req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
-               (&conn, &req->rq_rep.ost->oa); 
+        if (req->rq_status)
+                GOTO(out, rc = 0);
 
-       EXIT;
-       return 0;
-}
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(out_local, rc = -ENOMEM);
+        desc->bd_portal = OST_BULK_PORTAL;
 
-static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
-{
-       struct obd_conn conn; 
-       int rc;
-
-       ENTRY;
-       
-       conn.oc_dev = ost->ost_tgt;
-
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
-               return rc;
-       }
-
-       req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
-
-        CDEBUG(0, "ost_connect: rep buffer %p, id %d\n", req->rq_repbuf,
-              conn.oc_id);
-       req->rq_rep.ost->connid = conn.oc_id;
-       EXIT;
-       return 0;
-}
+        for (i = 0; i < niocount; i++) {
+                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
 
-static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
-{
-       struct obd_conn conn; 
-       int rc;
+                if (bulk == NULL)
+                        GOTO(out_bulk, rc = -ENOMEM);
+                bulk->bp_xid = remote_nb[i].xid;
+                bulk->bp_buf = local_nb[i].addr;
+                bulk->bp_buflen = remote_nb[i].len;
+        }
 
-       ENTRY;
-       
-       conn.oc_dev = ost->ost_tgt;
-       conn.oc_id = req->rq_req.ost->connid;
+        rc = ptlrpc_send_bulk(desc);
+        if (rc)
+                GOTO(out_bulk, rc);
 
-       rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
-               return rc;
-       }
+        lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+        rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, &lwi);
+        if (rc) {
+                LASSERT(rc == -ETIMEDOUT);
+                GOTO(out_bulk, rc);
+        }
 
-       req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
+        req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
+                                      local_nb, desc_priv);
 
-       EXIT;
-       return 0;
-}
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
 
-static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
-{
-       struct obd_conn conn; 
-       int rc;
-       int vallen;
-       void *val;
-       char *ptr; 
-
-       ENTRY;
-       
-       conn.oc_id = req->rq_req.ost->connid;
-       conn.oc_dev = ost->ost_tgt;
-
-       ptr = ost_req_buf1(req->rq_req.ost);
-       req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
-               (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); 
-
-       rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_setattr: cannot pack reply\n"); 
-               return rc;
-       }
-
-       EXIT;
-       return 0;
+out_bulk:
+        ptlrpc_free_bulk(desc);
+out_local:
+        OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+out:
+        if (rc) {
+                /* It's a lot of work to delay allocating the reply, and a lot
+                 * less work to just free it here. */
+                OBD_FREE(req->rq_repmsg, req->rq_replen);
+                req->rq_repmsg = NULL;
+                ptlrpc_error(req->rq_svc, req);
+        } else
+                ptlrpc_reply(req->rq_svc, req);
+        RETURN(rc);
 }
 
-int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_write(struct ptlrpc_request *req)
 {
-       struct obd_conn conn; 
-       int rc;
-       int i, j;
-       int objcount, niocount;
-       char *tmp1, *tmp2, *end2;
-       char *res;
-       int cmd;
-       struct niobuf *nb, *src, *dst;
-       struct obd_ioobj *ioo;
-       struct ost_req *r = req->rq_req.ost;
-
-       ENTRY;
-       
-       tmp1 = ost_req_buf1(r);
-       tmp2 = ost_req_buf2(r);
-       end2 = tmp2 + req->rq_req.ost->buflen2;
-       objcount = r->buflen1 / sizeof(*ioo); 
-       niocount = r->buflen2 / sizeof(*nb); 
-       cmd = r->cmd;
-
-       conn.oc_id = req->rq_req.ost->connid;
-       conn.oc_dev = req->rq_ost->ost_tgt;
-
-       rc = ost_pack_rep(NULL, niocount, NULL, 0, 
-                         &req->rq_rephdr, &req->rq_rep.ost,
-                         &req->rq_replen, &req->rq_repbuf); 
-       if (rc) { 
-               printk("ost_create: cannot pack reply\n"); 
-               return rc;
-       }
-       res = ost_rep_buf1(req->rq_rep.ost); 
-
-       for (i=0; i < objcount; i++) { 
-               ost_unpack_ioo((void *)&tmp1, &ioo);
-               if (tmp2 + ioo->ioo_bufcnt > end2) { 
-                       rc = -EFAULT;
-                       break; 
-               }
-               for (j = 0 ; j < ioo->ioo_bufcnt ; j++) { 
-                       ost_unpack_niobuf((void *)&tmp2, &nb); 
-               }
-       }
-
-       /* The unpackers move tmp1 and tmp2, so reset them before using */
-       tmp1 = ost_req_buf1(r);
-       tmp2 = ost_req_buf2(r);
-       req->rq_rep.ost->result = 
-               req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw
-               (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
-                niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
-
-       if (cmd == OBD_BRW_WRITE) { 
-               for (i=0; i<niocount; i++) { 
-                       src = &((struct niobuf *)tmp2)[i];
-                       dst = &((struct niobuf *)res)[i];
-                       memcpy((void *)(unsigned long)dst->addr, 
-                              (void *)(unsigned long)src->addr, 
-                              src->len);
-               }
-       } else { 
-               for (i=0; i<niocount; i++) { 
-                       dst = &((struct niobuf *)tmp2)[i];
-                       src = &((struct niobuf *)res)[i];
-                       memcpy((void *)(unsigned long)dst->addr, 
-                              (void *)(unsigned long)src->addr, 
-                              PAGE_SIZE); 
-               }
-       }
-
-       req->rq_rep.ost->result = 
-               req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw
-               (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
-                niocount, (struct niobuf *)res); 
-
-       EXIT;
-       return 0;
-}
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ptlrpc_bulk_desc *desc;
+        struct niobuf_remote *remote_nb;
+        struct niobuf_local *local_nb, *lnb;
+        struct obd_ioobj *ioo;
+        struct ost_body *body;
+        int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
+        void *tmp1, *tmp2, *end2;
+        void *desc_priv = NULL;
+        int reply_sent = 0;
+        struct ptlrpc_service *srv;
+        struct l_wait_info lwi;
+        __u32 xid;
+        ENTRY;
 
-int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
-{
-       int rc;
-       struct ost_obd *ost = &obddev->u.ost;
-       struct ptlreq_hdr *hdr;
-
-       ENTRY;
-        CDEBUG(0, "req at %p\n", req);
-
-       hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
-       if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
-               printk("lustre_ost: wrong packet type sent %d\n",
-                      NTOH__u32(hdr->type));
-               rc = -EINVAL;
-               goto out;
-       }
-
-       rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
-                           &req->rq_reqhdr, &req->rq_req.ost);
-       if (rc) { 
-               printk("lustre_ost: Invalid request\n");
-               EXIT; 
-               goto out;
-       }
-
-       switch (req->rq_reqhdr->opc) { 
-
-       case OST_CONNECT:
-               CDEBUG(D_INODE, "connect\n");
-               rc = ost_connect(ost, req);
-               break;
-       case OST_DISCONNECT:
-               CDEBUG(D_INODE, "disconnect\n");
-               rc = ost_disconnect(ost, req);
-               break;
-       case OST_GET_INFO:
-               CDEBUG(D_INODE, "get_info\n");
-               rc = ost_get_info(ost, req);
-               break;
-       case OST_CREATE:
-               CDEBUG(D_INODE, "create\n");
-               rc = ost_create(ost, req);
-               break;
-       case OST_DESTROY:
-               CDEBUG(D_INODE, "destroy\n");
-               rc = ost_destroy(ost, req);
-               break;
-       case OST_GETATTR:
-               CDEBUG(D_INODE, "getattr\n");
-               rc = ost_getattr(ost, req);
-               break;
-       case OST_SETATTR:
-               CDEBUG(D_INODE, "setattr\n");
-               rc = ost_setattr(ost, req);
-               break;
-       case OST_BRW:
-               CDEBUG(D_INODE, "brw\n");
-               rc = ost_brw(ost, req);
-               break;
-       default:
-               req->rq_status = -ENOTSUPP;
-               return ost_error(obddev, req);
-       }
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+        objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+        niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+        cmd = OBD_BRW_WRITE;
+
+        for (i = 0; i < objcount; i++) {
+                ost_unpack_ioo((void *)&tmp1, &ioo);
+                if (tmp2 + ioo->ioo_bufcnt > end2) {
+                        rc = -EFAULT;
+                        break;
+                }
+                for (j = 0; j < ioo->ioo_bufcnt; j++)
+                        ost_unpack_niobuf((void *)&tmp2, &remote_nb);
+        }
 
+        size[1] = niocount * sizeof(*remote_nb);
+        rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                GOTO(out, rc);
+        remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
+
+        OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+        if (local_nb == NULL)
+                GOTO(out, rc = -ENOMEM);
+
+        /* The unpackers move tmp1 and tmp2, so reset them before using */
+        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+        req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2,
+                                    local_nb, &desc_priv);
+        if (req->rq_status)
+                GOTO(out_free, rc = 0); /* XXX is this correct? */
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
+                GOTO(fail_preprw, rc = 0);
+
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(fail_preprw, rc = -ENOMEM);
+        desc->bd_cb = NULL;
+        desc->bd_portal = OSC_BULK_PORTAL;
+        desc->bd_desc_private = desc_priv;
+        memcpy(&(desc->bd_conn), &conn, sizeof(conn));
+
+        srv = req->rq_obd->u.ost.ost_service;
+        spin_lock(&srv->srv_lock);
+        xid = srv->srv_xid++;                   /* single xid for all pages */
+        spin_unlock(&srv->srv_lock);
+
+        for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
+                struct ptlrpc_bulk_page *bulk;
+
+                bulk = ptlrpc_prep_bulk_page(desc);
+                if (bulk == NULL)
+                        GOTO(fail_bulk, rc = -ENOMEM);
+
+                bulk->bp_xid = xid;              /* single xid for all pages */
+
+                bulk->bp_buf = lnb->addr;
+                bulk->bp_page = lnb->page;
+                bulk->bp_flags = lnb->flags;
+                bulk->bp_dentry = lnb->dentry;
+                bulk->bp_buflen = lnb->len;
+                bulk->bp_cb = NULL;
+
+                /* this advances remote_nb */
+                ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
+                                bulk->bp_xid);
+        }
+
+        rc = ptlrpc_register_bulk(desc);
+        if (rc)
+                GOTO(fail_bulk, rc);
+
+        reply_sent = 1;
+        ptlrpc_reply(req->rq_svc, req);
+
+        lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
+                          &lwi);
+        if (rc) {
+                if (rc != -ETIMEDOUT)
+                        LBUG();
+                GOTO(fail_bulk, rc);
+        }
+
+        rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
+                          desc->bd_desc_private);
+        ptlrpc_free_bulk(desc);
+        EXIT;
+out_free:
+        OBD_FREE(local_nb, niocount * sizeof(*local_nb));
 out:
-       req->rq_status = rc;
-       if (rc) { 
-               printk("ost: processing error %d\n", rc);
-               ost_error(obddev, req);
-       } else { 
-               CDEBUG(D_INODE, "sending reply\n"); 
-               ost_reply(obddev, req); 
-       }
-
-       return 0;
-}
+        if (!reply_sent) {
+                if (rc) {
+                        OBD_FREE(req->rq_repmsg, req->rq_replen);
+                        req->rq_repmsg = NULL;
+                        ptlrpc_error(req->rq_svc, req);
+                } else
+                        ptlrpc_reply(req->rq_svc, req);
+        }
+        return rc;
 
-int ost_main(void *arg)
-{
-       struct obd_device *obddev = (struct obd_device *) arg;
-       struct ost_obd *ost = &obddev->u.ost;
-       ENTRY;
-
-       lock_kernel();
-       daemonize();
-       spin_lock_irq(&current->sigmask_lock);
-       sigfillset(&current->blocked);
-       recalc_sigpending(current);
-       spin_unlock_irq(&current->sigmask_lock);
-
-       sprintf(current->comm, "lustre_ost");
-
-       /* Record that the  thread is running */
-       ost->ost_thread = current;
-       wake_up(&ost->ost_done_waitq); 
-
-       /* XXX maintain a list of all managed devices: insert here */
-
-       /* And now, wait forever for commit wakeup events. */
-       while (1) {
-               int rc; 
-
-               if (ost->ost_flags & OST_EXIT)
-                       break;
-
-               wake_up(&ost->ost_done_waitq);
-               interruptible_sleep_on(&ost->ost_waitq);
-
-               CDEBUG(D_INODE, "lustre_ost wakes\n");
-               CDEBUG(D_INODE, "pick up req here and continue\n"); 
-
-
-               if (ost->ost_service != NULL) {
-                       ptl_event_t ev;
-
-                       while (1) {
-                               struct ptlrpc_request request;
-                               struct ptlrpc_service *service;
-
-                               rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
-                               if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
-                                       break;
-
-                               service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
-
-                               /* FIXME: If we move to an event-driven model,
-                                * we should put the request on the stack of
-                                * mds_handle instead. */
-                               memset(&request, 0, sizeof(request));
-                               request.rq_reqbuf = ev.mem_desc.start +
-                                       ev.offset;
-                               request.rq_reqlen = ev.mem_desc.length;
-                               request.rq_ost = ost;
-                               request.rq_xid = ev.match_bits;
-
-                               request.rq_peer.peer_nid = ev.initiator.nid;
-                               /* FIXME: this NI should be the incoming NI.
-                                * We don't know how to find that from here. */
-                               request.rq_peer.peer_ni =
-                                       ost->ost_service->srv_self.peer_ni;
-                               rc = ost_handle(obddev, &request);
-
-                               /* Inform the rpc layer the event has been handled */
-                                ptl_received_rpc(service);
-                       }
-               } else {
-                       struct ptlrpc_request *request;
-
-                       if (list_empty(&ost->ost_reqs)) { 
-                               CDEBUG(D_INODE, "woke because of timer\n"); 
-                       } else { 
-                               request = list_entry(ost->ost_reqs.next,
-                                                    struct ptlrpc_request,
-                                                    rq_list);
-                               list_del(&request->rq_list);
-                               rc = ost_handle(obddev, request); 
-                       }
-               }
-       }
-
-       /* XXX maintain a list of all managed devices: cleanup here */
-
-       ost->ost_thread = NULL;
-       wake_up(&ost->ost_done_waitq);
-       printk("lustre_ost: exiting\n");
-       return 0;
+fail_bulk:
+        ptlrpc_free_bulk(desc);
+fail_preprw:
+        /* FIXME: how do we undo the preprw? */
+        goto out_free;
 }
 
-static void ost_stop_srv_thread(struct ost_obd *ost)
+static int ost_handle(struct ptlrpc_request *req)
 {
-       ost->ost_flags |= OST_EXIT;
+        int rc;
+        ENTRY;
 
-       while (ost->ost_thread) {
-               wake_up(&ost->ost_waitq);
-               sleep_on(&ost->ost_done_waitq);
-       }
-}
+        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
+                CERROR("lustre_ost: Invalid request\n");
+                GOTO(out, rc);
+        }
 
-static void ost_start_srv_thread(struct obd_device *obd)
-{
-       struct ost_obd *ost = &obd->u.ost;
-       ENTRY;
-
-       init_waitqueue_head(&ost->ost_waitq);
-       init_waitqueue_head(&ost->ost_done_waitq);
-       kernel_thread(ost_main, (void *)obd, 
-                     CLONE_VM | CLONE_FS | CLONE_FILES);
-       while (!ost->ost_thread) 
-               sleep_on(&ost->ost_done_waitq);
-       EXIT;
+        if (req->rq_reqmsg->opc != OST_CONNECT &&
+            req->rq_export == NULL) {
+                CERROR("lustre_ost: operation %d on unconnected OST\n",
+                       req->rq_reqmsg->opc);
+                GOTO(out, rc = -ENOTCONN);
+        }
+
+        if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
+                GOTO(out, rc = -EINVAL);
+
+        switch (req->rq_reqmsg->opc) {
+        case OST_CONNECT:
+                CDEBUG(D_INODE, "connect\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
+                rc = target_handle_connect(req);
+                break;
+        case OST_DISCONNECT:
+                CDEBUG(D_INODE, "disconnect\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
+                rc = target_handle_disconnect(req);
+                break;
+        case OST_CREATE:
+                CDEBUG(D_INODE, "create\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
+                rc = ost_create(req);
+                break;
+        case OST_DESTROY:
+                CDEBUG(D_INODE, "destroy\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
+                rc = ost_destroy(req);
+                break;
+        case OST_GETATTR:
+                CDEBUG(D_INODE, "getattr\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
+                rc = ost_getattr(req);
+                break;
+        case OST_SETATTR:
+                CDEBUG(D_INODE, "setattr\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
+                rc = ost_setattr(req);
+                break;
+        case OST_OPEN:
+                CDEBUG(D_INODE, "open\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
+                rc = ost_open(req);
+                break;
+        case OST_CLOSE:
+                CDEBUG(D_INODE, "close\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
+                rc = ost_close(req);
+                break;
+        case OST_WRITE:
+                CDEBUG(D_INODE, "write\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
+                rc = ost_brw_write(req);
+                /* ost_brw sends its own replies */
+                RETURN(rc);
+        case OST_READ:
+                CDEBUG(D_INODE, "read\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
+                rc = ost_brw_read(req);
+                /* ost_brw sends its own replies */
+                RETURN(rc);
+        case OST_PUNCH:
+                CDEBUG(D_INODE, "punch\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
+                rc = ost_punch(req);
+                break;
+        case OST_STATFS:
+                CDEBUG(D_INODE, "statfs\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
+                rc = ost_statfs(req);
+                break;
+        case LDLM_ENQUEUE:
+                CDEBUG(D_INODE, "enqueue\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+                rc = ldlm_handle_enqueue(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_CONVERT:
+                CDEBUG(D_INODE, "convert\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+                rc = ldlm_handle_convert(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_CANCEL:
+                CDEBUG(D_INODE, "cancel\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
+                rc = ldlm_handle_cancel(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_BL_CALLBACK:
+        case LDLM_CP_CALLBACK:
+                CDEBUG(D_INODE, "callback\n");
+                CERROR("callbacks should not happen on OST\n");
+                LBUG();
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
+                break;
+        default:
+                req->rq_status = -ENOTSUPP;
+                rc = ptlrpc_error(req->rq_svc, req);
+                RETURN(rc);
+        }
+
+        EXIT;
+out:
+        //req->rq_status = rc;
+        if (rc) {
+                CERROR("ost: processing error (opcode=%d): %d\n",
+                       req->rq_reqmsg->opc, rc);
+                ptlrpc_error(req->rq_svc, req);
+        } else {
+                CDEBUG(D_INODE, "sending reply\n");
+                if (req->rq_repmsg == NULL)
+                        CERROR("handler for opcode %d returned rc=0 without "
+                               "creating rq_repmsg; needs to return rc != "
+                               "0!\n", req->rq_reqmsg->opc);
+                ptlrpc_reply(req->rq_svc, req);
+        }
+
+        return 0;
 }
 
+#define OST_NUM_THREADS 6
+
 /* mount the file system (secretly) */
-static int ost_setup(struct obd_device *obddev, obd_count len,
-                       void *buf)
-                       
+static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
-       struct obd_ioctl_data* data = buf;
-       struct ost_obd *ost = &obddev->u.ost;
-       struct obd_device *tgt;
-       struct lustre_peer peer;
-       int err; 
+        struct obd_ioctl_data* data = buf;
+        struct ost_obd *ost = &obddev->u.ost;
+        struct obd_device *tgt;
+        int err;
+        int i;
         ENTRY;
 
-       if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
-               EXIT;
-               return -ENODEV;
-       }
+        if (data->ioc_inllen1 < 1) {
+                CERROR("requires a TARGET OBD UUID\n");
+                RETURN(-EINVAL);
+        }
+        if (data->ioc_inllen1 > 37) {
+                CERROR("OBD UUID must be less than 38 characters\n");
+                RETURN(-EINVAL);
+        }
 
-        tgt = &obd_dev[data->ioc_dev];
-       ost->ost_tgt = tgt;
-        if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
-             ! (tgt->obd_flags & OBD_SET_UP) ){
-                printk("device not attached or not set up (%d)\n", 
+        MOD_INC_USE_COUNT;
+        tgt = class_uuid2obd(data->ioc_inlbuf1);
+        if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
+            !(tgt->obd_flags & OBD_SET_UP)) {
+                CERROR("device not attached or not set up (%d)\n",
                        data->ioc_dev);
-                EXIT;
-               return -EINVAL;
-        } 
-
-       ost->ost_conn.oc_dev = tgt;
-       err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
-       if (err) { 
-               printk("lustre ost: fail to connect to device %d\n", 
-                      data->ioc_dev); 
-               return -EINVAL;
-       }
-
-       INIT_LIST_HEAD(&ost->ost_reqs);
-       ost->ost_thread = NULL;
-       ost->ost_flags = 0;
-
-       spin_lock_init(&obddev->u.ost.ost_lock);
-
-       err = kportal_uuid_to_peer("self", &peer);
-       if (err == 0) {
-               OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service));
-               if (ost->ost_service == NULL)
-                       return -ENOMEM;
-               ost->ost_service->srv_buf_size = 64 * 1024;
-               ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
-               memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
-               ost->ost_service->srv_wait_queue = &ost->ost_waitq;
-
-               rpc_register_service(ost->ost_service, "self");
-       }
-
-       ost_start_srv_thread(obddev);
+                GOTO(error_dec, err = -EINVAL);
+        }
 
-        MOD_INC_USE_COUNT;
-        EXIT; 
-        return 0;
-} 
+        err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL);
+        if (err) {
+                CERROR("fail to connect to device %d\n", data->ioc_dev);
+                GOTO(error_dec, err = -EINVAL);
+        }
+
+        ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
+                                           OST_BUFSIZE, OST_MAXREQSIZE,
+                                           OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 
+                                           "self", ost_handle, "ost");
+        if (!ost->ost_service) {
+                CERROR("failed to start service\n");
+                GOTO(error_disc, err = -EINVAL);
+        }
+
+        for (i = 0; i < OST_NUM_THREADS; i++) {
+                char name[32];
+                sprintf(name, "lustre_ost_%02d", i);
+                err = ptlrpc_start_thread(obddev, ost->ost_service, name);
+                if (err) {
+                        CERROR("error starting thread #%d: rc %d\n", i, err);
+                        GOTO(error_disc, err = -EINVAL);
+                }
+        }
+
+        RETURN(0);
+
+error_disc:
+        obd_disconnect(&ost->ost_conn);
+error_dec:
+        MOD_DEC_USE_COUNT;
+        RETURN(err);
+}
 
 static int ost_cleanup(struct obd_device * obddev)
 {
-       struct ost_obd *ost = &obddev->u.ost;
-       struct obd_device *tgt;
-       int err;
+        struct ost_obd *ost = &obddev->u.ost;
+        int err;
 
         ENTRY;
 
-        if ( !(obddev->obd_flags & OBD_SET_UP) ) {
-                EXIT;
-                return 0;
+        if ( !list_empty(&obddev->obd_exports) ) {
+                CERROR("still has clients!\n");
+                RETURN(-EBUSY);
         }
 
-        if ( !list_empty(&obddev->obd_gen_clients) ) {
-                printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
-                EXIT;
-                return -EBUSY;
-        }
+        ptlrpc_stop_all_threads(ost->ost_service);
+        ptlrpc_unregister_service(ost->ost_service);
 
-       ost_stop_srv_thread(ost);
-       rpc_unregister_service(ost->ost_service);
-        OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
-
-       if (!list_empty(&ost->ost_reqs)) {
-               // XXX reply with errors and clean up
-               CDEBUG(D_INODE, "Request list not empty!\n");
-       }
-
-       tgt = ost->ost_tgt;
-       err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
-       if (err) { 
-               printk("lustre ost: fail to disconnect device\n");
-               return -EINVAL;
-       }
-       
+        err = obd_disconnect(&ost->ost_conn);
+        if (err) {
+                CERROR("lustre ost: fail to disconnect device\n");
+                RETURN(-EINVAL);
+        }
 
         MOD_DEC_USE_COUNT;
-        EXIT;
+        RETURN(0);
+}
+int ost_attach(struct obd_device *dev, 
+                   obd_count len, void *data)
+{
+        /*  lprocfs_reg_dev(dev, (lprocfs_group_t*)lprocfs_ptlrpc_nm,
+                        sizeof(struct lprofiler_ptlrpc));
+        */
+        lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+        return 0; 
+}
+
+int ost_detach(struct obd_device *dev)
+{
+        /* lprocfs_dereg_dev(dev); */
+        lprocfs_dereg_obd(dev);
         return 0;
+
 }
 
+
+
 /* use obd ops to offer management infrastructure */
 static struct obd_ops ost_obd_ops = {
+        o_attach:      ost_attach,
+        o_detach:      ost_detach,
         o_setup:       ost_setup,
         o_cleanup:     ost_cleanup,
 };
 
 static int __init ost_init(void)
 {
-        obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
-       return 0;
+        int rc;
+
+        rc = class_register_type(&ost_obd_ops,
+                                 (lprocfs_vars_t*)status_class_var, 
+                                 LUSTRE_OST_NAME);
+        if (rc) RETURN(rc);
+
+        return 0;
+
 }
 
 static void __exit ost_exit(void)
 {
-       obd_unregister_type(LUSTRE_OST_NAME);
+        
+        class_unregister_type(LUSTRE_OST_NAME);
 }
 
-MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
 MODULE_LICENSE("GPL");
 
-// for testing (maybe this stays)
-EXPORT_SYMBOL(ost_queue_req);
-
 module_init(ost_init);
 module_exit(ost_exit);