Whamcloud - gitweb
- documentation update for MDS recovery
[fs/lustre-release.git] / lustre / osc / osc_request.c
index c74462e..7b33e6c 100644 (file)
-/* 
- * Copryright (C) 2001 Cluster File Systems, Inc.
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
  *
  *  This code is issued under the GNU General Public License.
  *  See the file COPYING in this distribution
  *
  *  Author Peter Braam <braam@clusterfs.com>
- * 
+ *
  *  This server is single threaded at present (but can easily be multi
  *  threaded). For testing and management it is treated as an
  *  obd_device, although it does not export a full OBD method table
  *  (the requests are coming in over the wire, so object target
  *  modules do not have a full method table.)
- * 
+ *
  */
 
 #define EXPORT_SYMTAB
+#define DEBUG_SUBSYSTEM S_OSC
 
-#include <linux/config.h>
 #include <linux/module.h>
-#include <linux/kernel.h>
-#include <linux/mm.h>
-#include <linux/string.h>
-#include <linux/stat.h>
-#include <linux/errno.h>
-#include <linux/locks.h>
-#include <linux/unistd.h>
-
-#include <asm/system.h>
-#include <asm/uaccess.h>
-
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <asm/uaccess.h>
-#include <linux/vmalloc.h>
-#include <asm/segment.h>
-#include <linux/miscdevice.h>
-
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
-
-extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
-
-/* FIXME: this belongs in some sort of service struct */
-static int osc_xid = 1;
-
-struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, 
-                                int buflen2, char *buf2)
+#include <linux/lustre_dlm.h>
+#include <linux/obd_ost.h>
+
+static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
+                       struct ptlrpc_connection **connection)
 {
-       struct ptlrpc_request *request;
-       int rc;
-       ENTRY; 
-
-       request = (struct ptlrpc_request *)kmalloc(sizeof(*request), GFP_KERNEL); 
-       if (!request) { 
-               printk("osc_prep_req: request allocation out of memory\n");
-               return NULL;
-       }
-
-       memset(request, 0, sizeof(*request));
-       request->rq_xid = osc_xid++;
-
-       rc = ost_pack_req(buf1, buflen1,  buf2, buflen2,
-                         &request->rq_reqhdr, &request->rq_req.ost, 
-                         &request->rq_reqlen, &request->rq_reqbuf);
-       if (rc) { 
-               printk("llight request: cannot pack request %d\n", rc); 
-               return NULL;
-       }
-       request->rq_reqhdr->opc = opcode;
-
-       EXIT;
-       return request;
+        struct osc_obd *osc = &conn->oc_dev->u.osc;
+        *cl = osc->osc_client;
+        *connection = osc->osc_conn;
 }
 
-/* XXX: unify with mdc_queue_wait */
-extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
+static int osc_connect(struct obd_conn *conn)
 {
-       struct obd_device *client = conn->oc_dev;
-       struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
-       int rc;
-
-       ENTRY;
-
-       /* set the connection id */
-       req->rq_req.ost->connid = conn->oc_id;
-
-       /* XXX fix the race here (wait_for_event?)*/
-       if (peer == NULL) {
-               /* Local delivery */
-               CDEBUG(D_INODE, "\n");
-               rc = ost_queue_req(client, req); 
-       } else {
-               /* Remote delivery via portals. */
-               req->rq_req_portal = OST_REQUEST_PORTAL;
-               req->rq_reply_portal = OST_REPLY_PORTAL;
-               rc = ptl_send_rpc(req, peer);
-       }
-       if (rc) { 
-               printk(__FUNCTION__ ": error %d, opcode %d\n", rc, 
-                      req->rq_reqhdr->opc); 
-               return -rc;
-       }
-
-       CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
-              &conn->oc_dev->u.osc.osc_tgt->u.ost, 
-              conn->oc_id, req->rq_reqhdr->opc, req);
-
-       /* wait for the reply */
-       init_waitqueue_head(&req->rq_wait_for_rep);
-       CDEBUG(D_INODE, "-- sleeping\n");
-       interruptible_sleep_on(&req->rq_wait_for_rep);
-       CDEBUG(D_INODE, "-- done\n");
-
-       rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
-                           &req->rq_rep.ost); 
-       if (rc) {
-               printk(__FUNCTION__ ": mds_unpack_rep failed: %d\n", rc);
-               return rc;
-       }
-
-       if ( req->rq_rephdr->status == 0 )
-               CDEBUG(D_INODE, "buf %p len %d status %d\n", 
-                      req->rq_repbuf, req->rq_replen, 
-                      req->rq_rephdr->status); 
-
-       EXIT;
-       return 0;
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_msg_buf(request->rq_repmsg, 0);
+        CDEBUG(D_INODE, "received connid %d\n", body->connid);
+
+        conn->oc_id = body->connid;
+        EXIT;
+ out:
+        ptlrpc_free_req(request);
+        return rc;
 }
 
-void osc_free_req(struct ptlrpc_request *request)
+static int osc_disconnect(struct obd_conn *conn)
 {
-       kfree(request);
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        body->connid = conn->oc_id;
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(request);
+        GOTO(out, rc);
+ out:
+        ptlrpc_free_req(request);
+        return rc;
 }
 
-int osc_connect(struct obd_conn *conn)
+static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 {
-       struct ptlrpc_request *request;
-       int rc; 
-       ENTRY;
-       
-       request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
-       if (!request) { 
-               printk(__FUNCTION__ ": cannot pack req!\n"); 
-               return -ENOMEM;
-       }
-
-       request->rq_replen = 
-               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
-
-       rc = osc_queue_wait(conn, request);
-       if (rc) { 
-               EXIT;
-               goto out;
-       }
-      
-       CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
-
-       conn->oc_id = request->rq_rep.ost->connid;
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        memcpy(&body->oa, oa, sizeof(*oa));
+        body->connid = conn->oc_id;
+        body->oa.o_valid = ~0;
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_msg_buf(request->rq_repmsg, 0);
+        CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
+        if (oa)
+                memcpy(oa, &body->oa, sizeof(*oa));
+
+        EXIT;
  out:
-       osc_free_req(request);
-       EXIT;
-       return rc;
+        ptlrpc_free_req(request);
+        return 0;
 }
 
-int osc_disconnect(struct obd_conn *conn)
+static int osc_open(struct obd_conn *conn, struct obdo *oa)
 {
-       struct ptlrpc_request *request;
-       int rc; 
-       ENTRY;
-       
-       request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
-       if (!request) { 
-               printk(__FUNCTION__ ": cannot pack req!\n"); 
-               return -ENOMEM;
-       }
-
-       request->rq_replen = 
-               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
-
-       rc = osc_queue_wait(conn, request);
-       if (rc) { 
-               EXIT;
-               goto out;
-       }
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        memcpy(&body->oa, oa, sizeof(*oa));
+        body->connid = conn->oc_id;
+        if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
+                LBUG();
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_msg_buf(request->rq_repmsg, 0);
+        CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
+        if (oa)
+                memcpy(oa, &body->oa, sizeof(*oa));
+
+        EXIT;
  out:
-       osc_free_req(request);
-       EXIT;
-       return rc;
+        ptlrpc_free_req(request);
+        return 0;
 }
 
-
-int osc_getattr(struct obd_conn *conn, struct obdo *oa)
+static int osc_close(struct obd_conn *conn, struct obdo *oa)
 {
-       struct ptlrpc_request *request;
-       int rc; 
-
-       request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
-       if (!request) { 
-               printk(__FUNCTION__ ": cannot pack req!\n"); 
-               return -ENOMEM;
-       }
-       
-       memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
-       request->rq_req.ost->oa.o_valid = ~0;
-       request->rq_replen = 
-               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
-       
-       rc = osc_queue_wait(conn, request);
-       if (rc) { 
-               EXIT;
-               goto out;
-       }
-
-       CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
-       if (oa) { 
-               memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
-       }
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        memcpy(&body->oa, oa, sizeof(*oa));
+        body->connid = conn->oc_id;
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out, rc);
 
+        body = lustre_msg_buf(request->rq_repmsg, 0);
+        CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
+        if (oa)
+                memcpy(oa, &body->oa, sizeof(*oa));
+
+        EXIT;
  out:
-       osc_free_req(request);
-       return 0;
+        ptlrpc_free_req(request);
+        return 0;
 }
 
-int osc_setattr(struct obd_conn *conn, struct obdo *oa)
+static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
 {
-       struct ptlrpc_request *request;
-       int rc; 
-
-       request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
-       if (!request) { 
-               printk(__FUNCTION__ ": cannot pack req!\n"); 
-               return -ENOMEM;
-       }
-       
-       memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
-       request->rq_replen = 
-               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
-       
-       rc = osc_queue_wait(conn, request);
-       if (rc) { 
-               EXIT;
-               goto out;
-       }
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        memcpy(&body->oa, oa, sizeof(*oa));
+        body->connid = conn->oc_id;
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(request);
+        GOTO(out, rc);
 
  out:
-       osc_free_req(request);
-       return 0;
+        ptlrpc_free_req(request);
+        return 0;
 }
 
-int osc_create(struct obd_conn *conn, struct obdo *oa)
+static int osc_create(struct obd_conn *conn, struct obdo *oa)
 {
-       struct ptlrpc_request *request;
-       int rc; 
-
-       if (!oa) { 
-               printk(__FUNCTION__ ": oa NULL\n"); 
-       }
-       request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL);
-       if (!request) { 
-               printk("osc_connect: cannot pack req!\n"); 
-               return -ENOMEM;
-       }
-       
-       memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
-       request->rq_req.ost->oa.o_valid = ~0;
-       request->rq_replen = 
-               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
-       
-       rc = osc_queue_wait(conn, request);
-       if (rc) { 
-               EXIT;
-               goto out;
-       }
-       memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        if (!oa) {
+                CERROR("oa NULL\n");
+                RETURN(-EINVAL);
+        }
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        memcpy(&body->oa, oa, sizeof(*oa));
+        body->oa.o_valid = ~0;
+        body->connid = conn->oc_id;
+
+        request->rq_replen = lustre_msg_size(1, &size);
 
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_msg_buf(request->rq_repmsg, 0);
+        memcpy(oa, &body->oa, sizeof(*oa));
+
+        EXIT;
  out:
-       osc_free_req(request);
-       return 0;
+        ptlrpc_free_req(request);
+        return 0;
 }
 
+static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
+                     obd_off offset)
+{
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        if (!oa) {
+                CERROR("oa NULL\n");
+                RETURN(-EINVAL);
+        }
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
 
-/* mount the file system (secretly) */
-static int osc_setup(struct obd_device *obddev, obd_count len,
-                       void *buf)
-                       
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        memcpy(&body->oa, oa, sizeof(*oa));
+        body->connid = conn->oc_id;
+        body->oa.o_valid = ~0;
+        body->oa.o_size = offset;
+        body->oa.o_blocks = count;
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_msg_buf(request->rq_repmsg, 0);
+        memcpy(oa, &body->oa, sizeof(*oa));
+
+        EXIT;
+ out:
+        ptlrpc_free_req(request);
+        return 0;
+}
+
+static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
 {
-       struct obd_ioctl_data* data = buf;
-       struct osc_obd *osc = &obddev->u.osc;
+        struct ptlrpc_request *request;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
         ENTRY;
 
-       if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
-               /* This is a local connection */
-               osc->osc_tgt = &obd_dev[data->ioc_dev];
-
-               printk("OSC: tgt %d ost at %p\n", data->ioc_dev,
-                      &osc->osc_tgt->u.ost);
-               if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
-                    ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
-                       printk("device not attached or not set up (%d)\n", 
-                              data->ioc_dev);
-                       EXIT;
-                       return -EINVAL;
-               }
-       } else {
-               int err;
-               /* This is a remote connection using Portals */
-
-               /* XXX: this should become something like ioc_inlbuf1 */
-               err = kportal_uuid_to_peer("ost", &osc->osc_peer);
-               if (err != 0) {
-                       printk("Cannot find 'ost' peer.\n");
-                       EXIT;
-                       return -EINVAL;
-               }
-       }
+        if (!oa) {
+                CERROR("oa NULL\n");
+                RETURN(-EINVAL);
+        }
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        memcpy(&body->oa, oa, sizeof(*oa));
+        body->connid = conn->oc_id;
+        body->oa.o_valid = ~0;
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_msg_buf(request->rq_repmsg, 0);
+        memcpy(oa, &body->oa, sizeof(*oa));
 
-        MOD_INC_USE_COUNT;
         EXIT;
+ out:
+        ptlrpc_free_req(request);
+        return 0;
+}
+
+int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
+                 struct niobuf *dst, struct niobuf *src)
+{
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+
+        osc_con2cl(conn, &cl, &connection);
+
+        if (cl->cli_obd) {
+                /* local sendpage */
+                memcpy((char *)(unsigned long)dst->addr,
+                       (char *)(unsigned long)src->addr, src->len);
+        } else {
+                struct ptlrpc_bulk_desc *bulk;
+                int rc;
+
+                bulk = ptlrpc_prep_bulk(connection);
+                if (bulk == NULL)
+                        RETURN(-ENOMEM);
+
+                bulk->b_buf = (void *)(unsigned long)src->addr;
+                bulk->b_buflen = src->len;
+                bulk->b_xid = dst->xid;
+                rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
+                if (rc != 0) {
+                        CERROR("send_bulk failed: %d\n", rc);
+                        ptlrpc_free_bulk(bulk);
+                        LBUG();
+                        RETURN(rc);
+                }
+                wait_event_interruptible(bulk->b_waitq,
+                                         ptlrpc_check_bulk_sent(bulk));
+
+                if (bulk->b_flags & PTL_RPC_FL_INTR) {
+                        ptlrpc_free_bulk(bulk);
+                        RETURN(-EINTR);
+                }
+
+                ptlrpc_free_bulk(bulk);
+        }
+
         return 0;
-} 
+}
 
-void osc_sendpage(struct niobuf *dst, struct niobuf *src)
+int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
+                 obd_count *oa_bufs, struct page **buf, obd_size *count,
+                 obd_off *offset, obd_flag *flags)
 {
-       memcpy((char *)(unsigned long)dst->addr,  
-              (char *)(unsigned long)src->addr, 
-              src->len);
-       return;
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ptlrpc_request *request;
+        struct ost_body *body;
+        struct obd_ioobj ioo;
+        struct niobuf src;
+        int pages, rc, i, j, size[3] = {sizeof(*body)};
+        void *ptr1, *ptr2;
+        struct ptlrpc_bulk_desc **bulk;
+        ENTRY;
+
+        size[1] = num_oa * sizeof(ioo);
+        pages = 0;
+        for (i = 0; i < num_oa; i++)
+                pages += oa_bufs[i];
+        size[2] = pages * sizeof(src);
+
+        OBD_ALLOC(bulk, pages * sizeof(*bulk));
+        if (bulk == NULL)
+                RETURN(-ENOMEM);
+
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
+        if (!request)
+                GOTO(out, rc = -ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        body->data = OBD_BRW_READ;
+
+        ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
+        ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
+        for (pages = 0, i = 0; i < num_oa; i++) {
+                ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+                for (j = 0; j < oa_bufs[i]; j++, pages++) {
+                        bulk[pages] = ptlrpc_prep_bulk(connection);
+                        if (bulk[pages] == NULL)
+                                GOTO(out, rc = -ENOMEM);
+
+                        spin_lock(&connection->c_lock);
+                        bulk[pages]->b_xid = ++connection->c_xid_out;
+                        spin_unlock(&connection->c_lock);
+
+                        bulk[pages]->b_buf = kmap(buf[pages]);
+                        bulk[pages]->b_buflen = PAGE_SIZE;
+                        bulk[pages]->b_portal = OST_BULK_PORTAL;
+                        ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
+                                        offset[pages], count[pages],
+                                        flags[pages], bulk[pages]->b_xid);
+
+                        rc = ptlrpc_register_bulk(bulk[pages]);
+                        if (rc)
+                                GOTO(out, rc);
+                }
+        }
+
+        request->rq_replen = lustre_msg_size(1, size);
+        rc = ptlrpc_queue_wait(request);
+        GOTO(out, rc);
+
+ out:
+        /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
+         * abort those bulk listeners. */
+
+        for (pages = 0, i = 0; i < num_oa; i++) {
+                for (j = 0; j < oa_bufs[i]; j++, pages++) {
+                        if (bulk[pages] == NULL)
+                                continue;
+                        kunmap(buf[pages]);
+                        ptlrpc_free_bulk(bulk[pages]);
+                }
+        }
+
+        OBD_FREE(bulk, pages * sizeof(*bulk));
+        ptlrpc_free_req(request);
+        return rc;
 }
 
+int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
+                  obd_count *oa_bufs, struct page **buf, obd_size *count,
+                  obd_off *offset, obd_flag *flags)
+{
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+        struct ptlrpc_request *request;
+        struct obd_ioobj ioo;
+        struct ost_body *body;
+        struct niobuf *src;
+        int pages, rc, i, j, size[3] = {sizeof(*body)};
+        void *ptr1, *ptr2;
+        ENTRY;
+
+        size[1] = num_oa * sizeof(ioo);
+        pages = 0;
+        for (i = 0; i < num_oa; i++)
+                pages += oa_bufs[i];
+        size[2] = pages * sizeof(*src);
+
+        OBD_ALLOC(src, size[2]);
+        if (!src)
+                RETURN(-ENOMEM);
+
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        body->data = OBD_BRW_WRITE;
+
+        ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
+        ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
+        for (pages = 0, i = 0; i < num_oa; i++) {
+                ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+                for (j = 0; j < oa_bufs[i]; j++, pages++) {
+                        ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
+                                        count[pages], flags[pages], 0);
+                }
+        }
+        memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
+
+        size[1] = pages * sizeof(struct niobuf);
+        request->rq_replen = lustre_msg_size(2, size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out, rc);
+
+        ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
+        if (ptr2 == NULL)
+                GOTO(out, rc = -EINVAL);
+
+        if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
+                CERROR("buffer length wrong (%d vs. %d)\n",
+                       request->rq_repmsg->buflens[1],
+                       pages * sizeof(struct niobuf));
+                GOTO(out, rc = -EINVAL);
+        }
+
+        for (pages = 0, i = 0; i < num_oa; i++) {
+                for (j = 0; j < oa_bufs[i]; j++, pages++) {
+                        struct niobuf *dst;
+                        ost_unpack_niobuf(&ptr2, &dst);
+                        osc_sendpage(conn, request, dst, &src[pages]);
+                }
+        }
+        OBD_FREE(src, size[2]);
+ out:
+        for (pages = 0, i = 0; i < num_oa; i++)
+                for (j = 0; j < oa_bufs[i]; j++, pages++)
+                        kunmap(buf[pages]);
+
+        ptlrpc_free_req(request);
+        return 0;
+}
 
 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
-             struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-             obd_size *count, obd_off *offset, obd_flag *flags)
+              struct obdo **oa, obd_count *oa_bufs, struct page **buf,
+              obd_size *count, obd_off *offset, obd_flag *flags)
 {
-       struct ptlrpc_request *request;
-       int rc; 
-       struct obd_ioobj ioo;
-       struct niobuf src;
-       int size1, size2 = 0; 
-       void *ptr1, *ptr2;
-       int i, j, n;
-
-       size1 = num_oa * sizeof(ioo); 
-       for (i = 0; i < num_oa; i++) { 
-               size2 += oa_bufs[i] * sizeof(src);
-       }
-
-       request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
-       if (!request) { 
-               printk("osc_connect: cannot pack req!\n"); 
-               return -ENOMEM;
-       }
-
-       n = 0;
-       request->rq_req.ost->cmd = rw;
-       ptr1 = ost_req_buf1(request->rq_req.ost);
-       ptr2 = ost_req_buf2(request->rq_req.ost);
-       for (i=0; i < num_oa; i++) { 
-               ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
-               for (j = 0 ; j < oa_bufs[i] ; j++) { 
-                       ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
-                                       count[n], flags[n]); 
-                       n++;
-               }
-       }
-
-       request->rq_replen = 
-               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
-
-       rc = osc_queue_wait(conn, request);
-       if (rc) { 
-               EXIT;
-               goto out;
-       }
-
-#if 0
-       ptr2 = ost_rep_buf2(request->rq_rep.ost); 
-       if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { 
-               printk(__FUNCTION__ ": buffer length wrong\n"); 
-               goto out;
-       }
-
-       if (rw == OBD_BRW_READ)
-               goto out;
-
-       for (i=0; i < num_oa; i++) { 
-               for (j = 0 ; j < oa_bufs[i] ; j++) { 
-                       struct niobuf *dst;
-                       src.addr = (__u64)(unsigned long)buf[n];
-                       src.len = count[n];
-                       ost_unpack_niobuf(&ptr2, &dst);
-                       osc_sendpage(dst, &src);
-                       n++;
-               }
-       }
-#endif
+        if (rw == OBD_BRW_READ)
+                return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
+                                    offset, flags);
+        else
+                return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
+                                     offset, flags);
+}
 
- out:
-       if (request->rq_rephdr)
-               kfree(request->rq_rephdr);
-       n = 0;
-       for (i=0; i < num_oa; i++) { 
-               for (j = 0 ; j < oa_bufs[i] ; j++) { 
-                       kunmap(buf[n]);
-                       n++;
-               }
-       }
-
-       osc_free_req(request);
-       return 0;
+static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
+{
+        struct osc_obd *osc = &obddev->u.osc;
+        int rc;
+        ENTRY;
+
+        osc->osc_conn = ptlrpc_uuid_to_connection("ost");
+        if (!osc->osc_conn)
+                RETURN(-EINVAL);
+
+        OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
+        if (osc->osc_client == NULL)
+                GOTO(out_conn, rc = -ENOMEM);
+
+        OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
+        if (osc->osc_ldlm_client == NULL)
+                GOTO(out_client, rc = -ENOMEM);
+
+        ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
+                           osc->osc_client);
+        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           osc->osc_ldlm_client);
+
+        MOD_INC_USE_COUNT;
+        RETURN(0);
+
+ out_client:
+        OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
+ out_conn:
+        ptlrpc_put_connection(osc->osc_conn);
+        return rc;
 }
 
 static int osc_cleanup(struct obd_device * obddev)
 {
+        struct osc_obd *osc = &obddev->u.osc;
+
+        ptlrpc_cleanup_client(osc->osc_client);
+        OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
+        ptlrpc_cleanup_client(osc->osc_ldlm_client);
+        OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
+        ptlrpc_put_connection(osc->osc_conn);
+
         MOD_DEC_USE_COUNT;
         return 0;
 }
 
-struct obd_ops osc_obd_ops = { 
-       o_setup:   osc_setup,
-       o_cleanup: osc_cleanup, 
-       o_create: osc_create,
-       o_getattr: osc_getattr,
-       o_setattr: osc_setattr,
-       o_connect: osc_connect,
-       o_disconnect: osc_disconnect,
-       o_brw: osc_brw
+struct obd_ops osc_obd_ops = {
+        o_setup:   osc_setup,
+        o_cleanup: osc_cleanup,
+        o_create: osc_create,
+        o_destroy: osc_destroy,
+        o_getattr: osc_getattr,
+        o_setattr: osc_setattr,
+        o_open: osc_open,
+        o_close: osc_close,
+        o_connect: osc_connect,
+        o_disconnect: osc_disconnect,
+        o_brw: osc_brw,
+        o_punch: osc_punch
 };
 
 static int __init osc_init(void)
 {
         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
-       return 0;
+        return 0;
 }
 
 static void __exit osc_exit(void)
 {
-       obd_unregister_type(LUSTRE_OSC_NAME);
+        obd_unregister_type(LUSTRE_OSC_NAME);
 }
 
 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
-MODULE_LICENSE("GPL"); 
+MODULE_LICENSE("GPL");
 
 module_init(osc_init);
 module_exit(osc_exit);
-