Whamcloud - gitweb
Lproc-snmp code drop
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index 7b0600e..9282752 100644 (file)
 #include <linux/obd_ost.h>
 #include <linux/lustre_net.h>
 #include <linux/lustre_dlm.h>
+#include <linux/init.h>
+#include <linux/lprocfs_status.h>
 
-static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
+extern lprocfs_vars_t status_var_nm_1[];
+extern lprocfs_vars_t status_class_var[];
+
+static int ost_destroy(struct ptlrpc_request *req)
 {
-        struct obd_conn conn;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
-        req->rq_status = obd_destroy(&conn, &body->oa);
+        req->rq_status = obd_destroy(conn, &body->oa, NULL);
         RETURN(0);
 }
 
-static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_getattr(struct ptlrpc_request *req)
 {
-        struct obd_conn conn;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*body);
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_getattr(&conn, &repbody->oa);
+        req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
         RETURN(0);
 }
 
-static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_statfs(struct ptlrpc_request *req)
 {
-        struct obd_conn conn;
-        struct ost_body *body, *repbody;
-        int rc, size = sizeof(*body);
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct obd_statfs *osfs;
+        int rc, size = sizeof(*osfs);
         ENTRY;
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
-
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, 0);
-        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_open(&conn, &repbody->oa);
+        osfs = lustre_msg_buf(req->rq_repmsg, 0);
+        memset(osfs, 0, size);
+
+        rc = obd_statfs(conn, osfs);
+        if (rc) {
+                CERROR("ost: statfs failed: rc %d\n", rc);
+                req->rq_status = rc;
+                RETURN(rc);
+        }
+        obd_statfs_pack(osfs, osfs);
+
         RETURN(0);
 }
 
-static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_open(struct ptlrpc_request *req)
 {
-        struct obd_conn conn;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*body);
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_close(&conn, &repbody->oa);
+        req->rq_status = obd_open(conn, &repbody->oa, NULL);
         RETURN(0);
 }
 
-static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_close(struct ptlrpc_request *req)
 {
-        struct obd_conn conn;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*body);
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_create(&conn, &repbody->oa);
+        req->rq_status = obd_close(conn, &repbody->oa, NULL);
         RETURN(0);
 }
 
-static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_create(struct ptlrpc_request *req)
 {
-        struct obd_conn conn;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*body);
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_punch(&conn, &repbody->oa,
-                                   repbody->oa.o_blocks, repbody->oa.o_size);
+        req->rq_status = obd_create(conn, &repbody->oa, NULL);
         RETURN(0);
 }
 
-static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_punch(struct ptlrpc_request *req)
 {
-        struct obd_conn conn;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*body);
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
+
+        if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
+            (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
+                RETURN(-EINVAL);
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_setattr(&conn, &repbody->oa);
+        req->rq_status = obd_punch(conn, &repbody->oa, NULL,
+                                   repbody->oa.o_size, repbody->oa.o_blocks);
         RETURN(0);
 }
 
-static int ost_connect(struct ptlrpc_request *req)
+static int ost_setattr(struct ptlrpc_request *req)
 {
-        struct obd_conn conn;
-        struct ost_body *body;
-        struct ost_obd *ost;
-        char *uuid;
-        int rc, size = sizeof(*body), i;
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+        struct ost_body *body, *repbody;
+        int rc, size = sizeof(*body);
         ENTRY;
 
-        uuid = lustre_msg_buf(req->rq_reqmsg, 0);
-        if (req->rq_reqmsg->buflens[0] > 37) {
-                /* Invalid UUID */
-                req->rq_status = -EINVAL;
-                RETURN(0);
-        }
-
-        i = obd_class_name2dev(uuid);
-        if (i == -1) {
-                req->rq_status = -ENODEV;
-                RETURN(0);
-        }
-
-        ost = &(obd_dev[i].u.ost);
-        conn.oc_dev = ost->ost_tgt;
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
-        req->rq_repmsg->target_id = i;
-        req->rq_status = obd_connect(&conn);
-
-        CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
-        body = lustre_msg_buf(req->rq_repmsg, 0);
-        body->connid = conn.oc_id;
+        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: unpack only valid fields instead of memcpy, endianness */
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
         RETURN(0);
 }
 
-static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_bulk_timeout(void *data)
 {
-        struct obd_conn conn;
-        struct ost_body *body;
-        int rc;
-        ENTRY;
-
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
+        struct ptlrpc_bulk_desc *desc = data;
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc)
-                RETURN(rc);
-
-        CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
-        req->rq_status = obd_disconnect(&conn);
-        RETURN(0);
-}
-
-static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
-{
-        struct obd_conn conn;
-        struct ost_body *body;
-        int rc, size[2] = {sizeof(*body)};
-        char *bufs[2] = {NULL, NULL}, *ptr;
         ENTRY;
-
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        conn.oc_id = body->connid;
-        conn.oc_dev = ost->ost_tgt;
-
-        ptr = lustre_msg_buf(req->rq_reqmsg, 1);
-        if (!ptr)
-                RETURN(-EINVAL);
-
-        req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
-                                      &(size[1]), (void **)&(bufs[1]));
-
-        rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
-        if (rc)
-                CERROR("cannot pack reply\n");
-
-        RETURN(rc);
+        CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+        RETURN(1);
 }
 
-static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_read(struct ptlrpc_request *req)
 {
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ptlrpc_bulk_desc *desc;
-        struct obd_conn conn;
         void *tmp1, *tmp2, *end2;
         struct niobuf_remote *remote_nb;
         struct niobuf_local *local_nb = NULL;
         struct obd_ioobj *ioo;
         struct ost_body *body;
+        struct l_wait_info lwi;
+        void *desc_priv = NULL;
         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
         ENTRY;
 
@@ -286,10 +239,10 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
-        cmd = body->data;
+        cmd = OBD_BRW_READ;
 
-        conn.oc_id = body->connid;
-        conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
+                GOTO(out, rc = 0);
 
         for (i = 0; i < objcount; i++) {
                 ost_unpack_ioo(&tmp1, &ioo);
@@ -301,68 +254,70 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
                         ost_unpack_niobuf(&tmp2, &remote_nb);
         }
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc)
-                RETURN(rc);
         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
         if (local_nb == NULL)
-                RETURN(-ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
-        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
-        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
-        req->rq_status = obd_preprw(cmd, &conn, objcount,
-                                    tmp1, niocount, tmp2, local_nb, NULL);
+        ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+        remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+        req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
+                                    remote_nb, local_nb, &desc_priv);
 
         if (req->rq_status)
-                GOTO(out_local, 0);
+                GOTO(out, rc = 0);
 
         desc = ptlrpc_prep_bulk(req->rq_connection);
         if (desc == NULL)
                 GOTO(out_local, rc = -ENOMEM);
-        desc->b_portal = OST_BULK_PORTAL;
+        desc->bd_portal = OST_BULK_PORTAL;
 
         for (i = 0; i < niocount; i++) {
-                struct ptlrpc_bulk_page *bulk;
-                bulk = ptlrpc_prep_bulk_page(desc);
+                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
+
                 if (bulk == NULL)
                         GOTO(out_bulk, rc = -ENOMEM);
-                remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
-                bulk->b_xid = remote_nb->xid;
-                bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
-                bulk->b_buflen = PAGE_SIZE;
+                bulk->bp_xid = remote_nb[i].xid;
+                bulk->bp_buf = local_nb[i].addr;
+                bulk->bp_buflen = remote_nb[i].len;
         }
 
         rc = ptlrpc_send_bulk(desc);
         if (rc)
                 GOTO(out_bulk, rc);
 
-        wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                rc = -EINTR;
+        lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+        rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, &lwi);
+        if (rc) {
+                LASSERT(rc == -ETIMEDOUT);
+                GOTO(out_bulk, rc);
+        }
 
-        /* The unpackers move tmp1 and tmp2, so reset them before using */
-        tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
-        tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
-        req->rq_status = obd_commitrw(cmd, &conn, objcount,
-                                      tmp1, niocount, local_nb, NULL);
+        req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
+                                      local_nb, desc_priv);
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
 
 out_bulk:
         ptlrpc_free_bulk(desc);
 out_local:
         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
 out:
-        if (rc)
-                ptlrpc_error(obddev->ost_service, req);
-        else
-                ptlrpc_reply(obddev->ost_service, req);
+        if (rc) {
+                /* It's a lot of work to delay allocating the reply, and a lot
+                 * less work to just free it here. */
+                OBD_FREE(req->rq_repmsg, req->rq_replen);
+                req->rq_repmsg = NULL;
+                ptlrpc_error(req->rq_svc, req);
+        } else
+                ptlrpc_reply(req->rq_svc, req);
         RETURN(rc);
 }
 
-static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_write(struct ptlrpc_request *req)
 {
+        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ptlrpc_bulk_desc *desc;
-        struct obd_conn conn;
         struct niobuf_remote *remote_nb;
         struct niobuf_local *local_nb, *lnb;
         struct obd_ioobj *ioo;
@@ -371,6 +326,9 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
         void *tmp1, *tmp2, *end2;
         void *desc_priv = NULL;
         int reply_sent = 0;
+        struct ptlrpc_service *srv;
+        struct l_wait_info lwi;
+        __u32 xid;
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
@@ -379,10 +337,7 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
-        cmd = body->data;
-
-        conn.oc_id = body->connid;
-        conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+        cmd = OBD_BRW_WRITE;
 
         for (i = 0; i < objcount; i++) {
                 ost_unpack_ioo((void *)&tmp1, &ioo);
@@ -407,41 +362,46 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
-        req->rq_status = obd_preprw(cmd, &conn, objcount,
-                                    tmp1, niocount, tmp2, local_nb, &desc_priv);
+        req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2,
+                                    local_nb, &desc_priv);
         if (req->rq_status)
                 GOTO(out_free, rc = 0); /* XXX is this correct? */
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
+                GOTO(fail_preprw, rc = 0);
+
         desc = ptlrpc_prep_bulk(req->rq_connection);
         if (desc == NULL)
                 GOTO(fail_preprw, rc = -ENOMEM);
-        desc->b_cb = NULL;
-        desc->b_portal = OSC_BULK_PORTAL;
-        desc->b_desc_private = desc_priv;
-        memcpy(&(desc->b_conn), &conn, sizeof(conn));
+        desc->bd_cb = NULL;
+        desc->bd_portal = OSC_BULK_PORTAL;
+        desc->bd_desc_private = desc_priv;
+        memcpy(&(desc->bd_conn), &conn, sizeof(conn));
+
+        srv = req->rq_obd->u.ost.ost_service;
+        spin_lock(&srv->srv_lock);
+        xid = srv->srv_xid++;                   /* single xid for all pages */
+        spin_unlock(&srv->srv_lock);
 
         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
-                struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
                 struct ptlrpc_bulk_page *bulk;
 
                 bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL)
                         GOTO(fail_bulk, rc = -ENOMEM);
 
-                spin_lock(&srv->srv_lock);
-                bulk->b_xid = srv->srv_xid++;
-                spin_unlock(&srv->srv_lock);
+                bulk->bp_xid = xid;              /* single xid for all pages */
 
-                bulk->b_buf = lnb->addr;
-                bulk->b_page = lnb->page;
-                bulk->b_flags = lnb->flags;
-                bulk->b_dentry = lnb->dentry;
-                bulk->b_buflen = PAGE_SIZE;
-                bulk->b_cb = NULL;
+                bulk->bp_buf = lnb->addr;
+                bulk->bp_page = lnb->page;
+                bulk->bp_flags = lnb->flags;
+                bulk->bp_dentry = lnb->dentry;
+                bulk->bp_buflen = lnb->len;
+                bulk->bp_cb = NULL;
 
                 /* this advances remote_nb */
                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
-                                bulk->b_xid);
+                                bulk->bp_xid);
         }
 
         rc = ptlrpc_register_bulk(desc);
@@ -449,23 +409,31 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
                 GOTO(fail_bulk, rc);
 
         reply_sent = 1;
-        ptlrpc_reply(obddev->ost_service, req);
+        ptlrpc_reply(req->rq_svc, req);
 
-        wait_event_interruptible(desc->b_waitq,
-                                 desc->b_flags & PTL_BULK_FL_RCVD);
+        lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
+                          &lwi);
+        if (rc) {
+                if (rc != -ETIMEDOUT)
+                        LBUG();
+                GOTO(fail_bulk, rc);
+        }
 
-        rc = obd_commitrw(cmd, &conn, objcount, tmp1, niocount, local_nb,
-                          desc->b_desc_private);
+        rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
+                          desc->bd_desc_private);
         ptlrpc_free_bulk(desc);
         EXIT;
 out_free:
         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
 out:
         if (!reply_sent) {
-                if (rc)
-                        ptlrpc_error(obddev->ost_service, req);
-                else
-                        ptlrpc_reply(obddev->ost_service, req);
+                if (rc) {
+                        OBD_FREE(req->rq_repmsg, req->rq_replen);
+                        req->rq_repmsg = NULL;
+                        ptlrpc_error(req->rq_svc, req);
+                } else
+                        ptlrpc_reply(req->rq_svc, req);
         }
         return rc;
 
@@ -476,114 +444,121 @@ fail_preprw:
         goto out_free;
 }
 
-static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
-{
-        struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
-
-        if (body->data & OBD_BRW_WRITE)
-                return ost_brw_write(obddev, req);
-        else
-                return ost_brw_read(obddev, req);
-}
-
-static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
-                      struct ptlrpc_request *req)
+static int ost_handle(struct ptlrpc_request *req)
 {
         int rc;
-        struct ost_obd *ost = NULL;
         ENTRY;
 
         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
-                CERROR("lustre_mds: Invalid request\n");
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
+                CERROR("lustre_ost: Invalid request\n");
                 GOTO(out, rc);
         }
 
-        if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
-                CERROR("lustre_mds: wrong packet type sent %d\n",
-                       req->rq_reqmsg->type);
-                GOTO(out, rc = -EINVAL);
+        if (req->rq_reqmsg->opc != OST_CONNECT &&
+            req->rq_export == NULL) {
+                CERROR("lustre_ost: operation %d on unconnected OST\n",
+                       req->rq_reqmsg->opc);
+                GOTO(out, rc = -ENOTCONN);
         }
 
-        if (req->rq_reqmsg->opc != OST_CONNECT) {
-                int id = req->rq_reqmsg->target_id;
-                struct obd_device *obddev;
-                if (id < 0 || id > MAX_OBD_DEVICES)
-                        GOTO(out, rc = -ENODEV);
-                obddev = &obd_dev[id];
-                if (strcmp(obddev->obd_type->typ_name, "ost") != 0)
-                        GOTO(out, rc = -EINVAL);
-                ost = &obddev->u.ost;
-                req->rq_obd = obddev;
-        }
+        if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
+                GOTO(out, rc = -EINVAL);
 
         switch (req->rq_reqmsg->opc) {
         case OST_CONNECT:
                 CDEBUG(D_INODE, "connect\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
-                rc = ost_connect(req);
+                rc = target_handle_connect(req);
                 break;
         case OST_DISCONNECT:
                 CDEBUG(D_INODE, "disconnect\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
-                rc = ost_disconnect(ost, req);
-                break;
-        case OST_GET_INFO:
-                CDEBUG(D_INODE, "get_info\n");
-                OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
-                rc = ost_get_info(ost, req);
+                rc = target_handle_disconnect(req);
                 break;
         case OST_CREATE:
                 CDEBUG(D_INODE, "create\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
-                rc = ost_create(ost, req);
+                rc = ost_create(req);
                 break;
         case OST_DESTROY:
                 CDEBUG(D_INODE, "destroy\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
-                rc = ost_destroy(ost, req);
+                rc = ost_destroy(req);
                 break;
         case OST_GETATTR:
                 CDEBUG(D_INODE, "getattr\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
-                rc = ost_getattr(ost, req);
+                rc = ost_getattr(req);
                 break;
         case OST_SETATTR:
                 CDEBUG(D_INODE, "setattr\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
-                rc = ost_setattr(ost, req);
+                rc = ost_setattr(req);
                 break;
         case OST_OPEN:
-                CDEBUG(D_INODE, "setattr\n");
+                CDEBUG(D_INODE, "open\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
-                rc = ost_open(ost, req);
+                rc = ost_open(req);
                 break;
         case OST_CLOSE:
-                CDEBUG(D_INODE, "setattr\n");
+                CDEBUG(D_INODE, "close\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
-                rc = ost_close(ost, req);
+                rc = ost_close(req);
                 break;
-        case OST_BRW:
-                CDEBUG(D_INODE, "brw\n");
+        case OST_WRITE:
+                CDEBUG(D_INODE, "write\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
+                rc = ost_brw_write(req);
+                /* ost_brw sends its own replies */
+                RETURN(rc);
+        case OST_READ:
+                CDEBUG(D_INODE, "read\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
-                rc = ost_brw(ost, req);
+                rc = ost_brw_read(req);
                 /* ost_brw sends its own replies */
                 RETURN(rc);
         case OST_PUNCH:
                 CDEBUG(D_INODE, "punch\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
-                rc = ost_punch(ost, req);
+                rc = ost_punch(req);
                 break;
-#if 0
         case OST_STATFS:
                 CDEBUG(D_INODE, "statfs\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
-                rc = ost_statfs(ost, req);
+                rc = ost_statfs(req);
+                break;
+        case LDLM_ENQUEUE:
+                CDEBUG(D_INODE, "enqueue\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+                rc = ldlm_handle_enqueue(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_CONVERT:
+                CDEBUG(D_INODE, "convert\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+                rc = ldlm_handle_convert(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_CANCEL:
+                CDEBUG(D_INODE, "cancel\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
+                rc = ldlm_handle_cancel(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_BL_CALLBACK:
+        case LDLM_CP_CALLBACK:
+                CDEBUG(D_INODE, "callback\n");
+                CERROR("callbacks should not happen on OST\n");
+                LBUG();
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
                 break;
-#endif
         default:
                 req->rq_status = -ENOTSUPP;
-                rc = ptlrpc_error(svc, req);
+                rc = ptlrpc_error(req->rq_svc, req);
                 RETURN(rc);
         }
 
@@ -591,16 +566,23 @@ static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
 out:
         //req->rq_status = rc;
         if (rc) {
-                CERROR("ost: processing error %d\n", rc);
-                ptlrpc_error(svc, req);
+                CERROR("ost: processing error (opcode=%d): %d\n",
+                       req->rq_reqmsg->opc, rc);
+                ptlrpc_error(req->rq_svc, req);
         } else {
                 CDEBUG(D_INODE, "sending reply\n");
-                ptlrpc_reply(svc, req);
+                if (req->rq_repmsg == NULL)
+                        CERROR("handler for opcode %d returned rc=0 without "
+                               "creating rq_repmsg; needs to return rc != "
+                               "0!\n", req->rq_reqmsg->opc);
+                ptlrpc_reply(req->rq_svc, req);
         }
 
         return 0;
 }
 
+#define OST_NUM_THREADS 6
+
 /* mount the file system (secretly) */
 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
@@ -608,58 +590,51 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
         struct ost_obd *ost = &obddev->u.ost;
         struct obd_device *tgt;
         int err;
+        int i;
         ENTRY;
 
-        if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
-                RETURN(-ENODEV);
+        if (data->ioc_inllen1 < 1) {
+                CERROR("requires a TARGET OBD UUID\n");
+                RETURN(-EINVAL);
+        }
+        if (data->ioc_inllen1 > 37) {
+                CERROR("OBD UUID must be less than 38 characters\n");
+                RETURN(-EINVAL);
+        }
 
         MOD_INC_USE_COUNT;
-        tgt = &obd_dev[data->ioc_dev];
-        ost->ost_tgt = tgt;
-        if (!(tgt->obd_flags & OBD_ATTACHED) ||
+        tgt = class_uuid2obd(data->ioc_inlbuf1);
+        if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
             !(tgt->obd_flags & OBD_SET_UP)) {
                 CERROR("device not attached or not set up (%d)\n",
                        data->ioc_dev);
                 GOTO(error_dec, err = -EINVAL);
         }
 
-        ost->ost_conn.oc_dev = tgt;
-        err = obd_connect(&ost->ost_conn);
+        err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL);
         if (err) {
                 CERROR("fail to connect to device %d\n", data->ioc_dev);
                 GOTO(error_dec, err = -EINVAL);
         }
 
-        obddev->obd_namespace =
-                ldlm_namespace_new("ost", LDLM_NAMESPACE_SERVER);
-        if (obddev->obd_namespace == NULL)
-                LBUG();
-
-        ost->ost_service = ptlrpc_init_svc(64 * 1024, OST_REQUEST_PORTAL,
-                                           OSC_REPLY_PORTAL, "self",ost_handle);
+        ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
+                                           OST_BUFSIZE, OST_MAXREQSIZE,
+                                           OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 
+                                           "self", ost_handle, "ost");
         if (!ost->ost_service) {
                 CERROR("failed to start service\n");
                 GOTO(error_disc, err = -EINVAL);
         }
 
-        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
-        if (err)
-                GOTO(error_disc, err = -EINVAL);
-        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
-        if (err)
-                GOTO(error_disc, err = -EINVAL);
-        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
-        if (err)
-                GOTO(error_disc, err = -EINVAL);
-        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
-        if (err)
-                GOTO(error_disc, err = -EINVAL);
-        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
-        if (err)
-                GOTO(error_disc, err = -EINVAL);
-        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
-        if (err)
-                GOTO(error_disc, err = -EINVAL);
+        for (i = 0; i < OST_NUM_THREADS; i++) {
+                char name[32];
+                sprintf(name, "lustre_ost_%02d", i);
+                err = ptlrpc_start_thread(obddev, ost->ost_service, name);
+                if (err) {
+                        CERROR("error starting thread #%d: rc %d\n", i, err);
+                        GOTO(error_disc, err = -EINVAL);
+                }
+        }
 
         RETURN(0);
 
@@ -677,7 +652,7 @@ static int ost_cleanup(struct obd_device * obddev)
 
         ENTRY;
 
-        if ( !list_empty(&obddev->obd_gen_clients) ) {
+        if ( !list_empty(&obddev->obd_exports) ) {
                 CERROR("still has clients!\n");
                 RETURN(-EBUSY);
         }
@@ -691,30 +666,57 @@ static int ost_cleanup(struct obd_device * obddev)
                 RETURN(-EINVAL);
         }
 
-        ldlm_namespace_free(obddev->obd_namespace);
-
         MOD_DEC_USE_COUNT;
         RETURN(0);
 }
+int ost_attach(struct obd_device *dev, 
+                   obd_count len, void *data)
+{
+        /*  lprocfs_reg_dev(dev, (lprocfs_group_t*)lprocfs_ptlrpc_nm,
+                        sizeof(struct lprofiler_ptlrpc));
+        */
+        lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+        return 0; 
+}
+
+int ost_detach(struct obd_device *dev)
+{
+        /* lprocfs_dereg_dev(dev); */
+        lprocfs_dereg_obd(dev);
+        return 0;
+
+}
+
+
 
 /* use obd ops to offer management infrastructure */
 static struct obd_ops ost_obd_ops = {
+        o_attach:      ost_attach,
+        o_detach:      ost_detach,
         o_setup:       ost_setup,
         o_cleanup:     ost_cleanup,
 };
 
 static int __init ost_init(void)
 {
-        obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
+        int rc;
+
+        rc = class_register_type(&ost_obd_ops,
+                                 (lprocfs_vars_t*)status_class_var, 
+                                 LUSTRE_OST_NAME);
+        if (rc) RETURN(rc);
+
         return 0;
+
 }
 
 static void __exit ost_exit(void)
 {
-        obd_unregister_type(LUSTRE_OST_NAME);
+        
+        class_unregister_type(LUSTRE_OST_NAME);
 }
 
-MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
 MODULE_LICENSE("GPL");