Whamcloud - gitweb
Merged branch 'peter' with the tip. Pre-merge tag is 't_20020302_networking'.
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 1b9dd03..f6bd069 100644 (file)
@@ -1,4 +1,6 @@
-/* 
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
  * Copryright (C) 2001 Cluster File Systems, Inc.
  *
  *  This code is issued under the GNU General Public License.
 #include <linux/lustre_lib.h>
 #include <linux/lustre_idl.h>
 
-extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
-
-/* FIXME: this belongs in some sort of service struct */
-static int osc_xid = 1;
-
-struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, 
-                                int buflen2, char *buf2)
-{
-       struct ptlrpc_request *request;
-       int rc;
-       ENTRY; 
-
-       OBD_ALLOC(request, sizeof(*request));
-       if (!request) { 
-               CERROR("request allocation out of memory\n");
-               return NULL;
-       }
-
-       memset(request, 0, sizeof(*request));
-       request->rq_xid = osc_xid++;
-
-       rc = ost_pack_req(buf1, buflen1,  buf2, buflen2,
-                         &request->rq_reqhdr, &request->rq_req.ost, 
-                         &request->rq_reqlen, &request->rq_reqbuf);
-       if (rc) { 
-               CERROR("llight request: cannot pack request %d\n", rc); 
-               return NULL;
-       }
-       request->rq_reqhdr->opc = opcode;
-
-       EXIT;
-       return request;
-}
-
-/* XXX: unify with mdc_queue_wait */
-extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
+struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
 {
-       struct obd_device *client = conn->oc_dev;
-       struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
-       int rc;
-        DECLARE_WAITQUEUE(wait, current);
-
-       ENTRY;
-
-       /* set the connection id */
-       req->rq_req.ost->connid = conn->oc_id;
-       init_waitqueue_head(&req->rq_wait_for_rep);
-
-       /* XXX fix the race here (wait_for_event?)*/
-       if (peer == NULL) {
-               /* Local delivery */
-               CDEBUG(D_INODE, "\n");
-               rc = ost_queue_req(client, req); 
-       } else {
-               /* Remote delivery via portals. */
-               req->rq_req_portal = OST_REQUEST_PORTAL;
-               req->rq_reply_portal = OST_REPLY_PORTAL;
-               rc = ptl_send_rpc(req, peer);
-       }
-       if (rc) { 
-               CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc); 
-               return -rc;
-       }
-
-       CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
-              &conn->oc_dev->u.osc.osc_tgt->u.ost, 
-              conn->oc_id, req->rq_reqhdr->opc, req);
-
-       /* wait for the reply */
-       CDEBUG(D_INODE, "-- sleeping\n");
-        add_wait_queue(&req->rq_wait_for_rep, &wait);
-        while (req->rq_repbuf == NULL) {
-                set_current_state(TASK_INTERRUPTIBLE);
-
-                /* if this process really wants to die, let it go */
-                if (sigismember(&(current->pending.signal), SIGKILL) ||
-                    sigismember(&(current->pending.signal), SIGINT))
-                        break;
-
-                schedule();
-        }
-        remove_wait_queue(&req->rq_wait_for_rep, &wait);
-        set_current_state(TASK_RUNNING);
-       CDEBUG(D_INODE, "-- done\n");
-
-        if (req->rq_repbuf == NULL) {
-                /* We broke out because of a signal */
-                EXIT;
-                return -EINTR;
-        }
-
-       rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
-                           &req->rq_rep.ost); 
-       if (rc) {
-               CERROR("mds_unpack_rep failed: %d\n", rc);
-               return rc;
-       }
+       struct osc_obd *osc = &conn->oc_dev->u.osc;
+       return &osc->osc_peer;
 
-       if ( req->rq_rephdr->status == 0 )
-               CDEBUG(D_INODE, "buf %p len %d status %d\n", 
-                      req->rq_repbuf, req->rq_replen, 
-                      req->rq_rephdr->status); 
-
-       EXIT;
-       return 0;
-}
-
-static void osc_free_req(struct ptlrpc_request *request)
-{
-       OBD_FREE(request, sizeof(*request));
 }
 
 static int osc_connect(struct obd_conn *conn)
 {
        struct ptlrpc_request *request;
+       struct ptlrpc_client *peer = osc_con2cl(conn);
        int rc; 
        ENTRY;
        
-       request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
+       request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL);
        if (!request) { 
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
@@ -169,7 +67,7 @@ static int osc_connect(struct obd_conn *conn)
        request->rq_replen = 
                sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
 
-       rc = osc_queue_wait(conn, request);
+       rc = ptlrpc_queue_wait(peer, request);
        if (rc) { 
                EXIT;
                goto out;
@@ -179,7 +77,7 @@ static int osc_connect(struct obd_conn *conn)
 
        conn->oc_id = request->rq_rep.ost->connid;
  out:
-       osc_free_req(request);
+       ptlrpc_free_req(request);
        EXIT;
        return rc;
 }
@@ -187,25 +85,26 @@ static int osc_connect(struct obd_conn *conn)
 static int osc_disconnect(struct obd_conn *conn)
 {
        struct ptlrpc_request *request;
+       struct ptlrpc_client *peer = osc_con2cl(conn);
        int rc; 
        ENTRY;
        
-       request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
+       request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL);
        if (!request) { 
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
        }
-
+       request->rq_req.ost->connid = conn->oc_id;
        request->rq_replen = 
                sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
 
-       rc = osc_queue_wait(conn, request);
+       rc = ptlrpc_queue_wait(peer, request);
        if (rc) { 
                EXIT;
                goto out;
        }
  out:
-       osc_free_req(request);
+       ptlrpc_free_req(request);
        EXIT;
        return rc;
 }
@@ -214,9 +113,10 @@ static int osc_disconnect(struct obd_conn *conn)
 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 {
        struct ptlrpc_request *request;
+       struct ptlrpc_client *peer = osc_con2cl(conn);
        int rc; 
 
-       request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
+       request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL);
        if (!request) { 
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
@@ -227,7 +127,7 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
        request->rq_replen = 
                sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
-       rc = osc_queue_wait(conn, request);
+       rc = ptlrpc_queue_wait(peer, request);
        if (rc) { 
                EXIT;
                goto out;
@@ -239,16 +139,17 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
        }
 
  out:
-       osc_free_req(request);
+       ptlrpc_free_req(request);
        return 0;
 }
 
 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
 {
        struct ptlrpc_request *request;
+       struct ptlrpc_client *peer = osc_con2cl(conn);
        int rc; 
 
-       request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
+       request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL);
        if (!request) { 
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
@@ -258,26 +159,27 @@ static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
        request->rq_replen = 
                sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
-       rc = osc_queue_wait(conn, request);
+       rc = ptlrpc_queue_wait(peer, request);
        if (rc) { 
                EXIT;
                goto out;
        }
 
  out:
-       osc_free_req(request);
+       ptlrpc_free_req(request);
        return 0;
 }
 
 static int osc_create(struct obd_conn *conn, struct obdo *oa)
 {
        struct ptlrpc_request *request;
+       struct ptlrpc_client *peer = osc_con2cl(conn);
        int rc; 
 
        if (!oa) { 
                CERROR("oa NULL\n"); 
        }
-       request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL);
+       request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL);
        if (!request) { 
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
@@ -288,7 +190,7 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa)
        request->rq_replen = 
                sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
-       rc = osc_queue_wait(conn, request);
+       rc = ptlrpc_queue_wait(peer, request);
        if (rc) { 
                EXIT;
                goto out;
@@ -296,19 +198,21 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa)
        memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
 
  out:
-       osc_free_req(request);
+       ptlrpc_free_req(request);
        return 0;
 }
 
-static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd_off offset)
+static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
+                     obd_off offset)
 {
        struct ptlrpc_request *request;
+       struct ptlrpc_client *peer = osc_con2cl(conn);
        int rc; 
 
        if (!oa) { 
                CERROR("oa NULL\n"); 
        }
-       request = ost_prep_req(OST_PUNCH, 0, NULL, 0, NULL);
+       request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL);
        if (!request) { 
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
@@ -321,7 +225,7 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd
        request->rq_replen = 
                sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
-       rc = osc_queue_wait(conn, request);
+       rc = ptlrpc_queue_wait(peer, request);
        if (rc) { 
                EXIT;
                goto out;
@@ -329,19 +233,20 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd
        memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
 
  out:
-       osc_free_req(request);
+       ptlrpc_free_req(request);
        return 0;
 }
 
 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
 {
        struct ptlrpc_request *request;
+       struct ptlrpc_client *peer = osc_con2cl(conn);
        int rc; 
 
        if (!oa) { 
                CERROR("oa NULL\n"); 
        }
-       request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL);
+       request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL);
        if (!request) { 
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
@@ -352,7 +257,7 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
        request->rq_replen = 
                sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
-       rc = osc_queue_wait(conn, request);
+       rc = ptlrpc_queue_wait(peer, request);
        if (rc) { 
                EXIT;
                goto out;
@@ -360,137 +265,216 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
        memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
 
  out:
-       osc_free_req(request);
+       ptlrpc_free_req(request);
        return 0;
 }
 
-
-/* mount the file system (secretly) */
-static int osc_setup(struct obd_device *obddev, obd_count len,
-                       void *buf)
-                       
-{
-       struct obd_ioctl_data* data = buf;
-       struct osc_obd *osc = &obddev->u.osc;
-        ENTRY;
-
-       if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
-               /* This is a local connection */
-               osc->osc_tgt = &obd_dev[data->ioc_dev];
-
-               CERROR("OSC: tgt %d ost at %p\n", data->ioc_dev,
-                      &osc->osc_tgt->u.ost);
-               if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
-                    ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
-                       CERROR("device not attached or not set up (%d)\n", 
-                              data->ioc_dev);
-                       EXIT;
-                       return -EINVAL;
-               }
-       } else {
-               int err;
-               /* This is a remote connection using Portals */
-
-               /* XXX: this should become something like ioc_inlbuf1 */
-               err = kportal_uuid_to_peer("ost", &osc->osc_peer);
-               if (err != 0) {
-                       CERROR("Cannot find 'ost' peer.\n");
-                       EXIT;
-                       return -EINVAL;
-               }
-       }
-
-        MOD_INC_USE_COUNT;
-        EXIT;
-        return 0;
-} 
-
-int osc_sendpage(struct ptlrpc_request *req, struct niobuf *dst,
-                 struct niobuf *src)
+int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
+                 struct niobuf *dst, struct niobuf *src)
 {
-        if (req->rq_peer.peer_nid == 0) {
+        if (conn->oc_id != -1) {
                 /* local sendpage */
                 memcpy((char *)(unsigned long)dst->addr,
                        (char *)(unsigned long)src->addr, src->len);
         } else {
+                struct ptlrpc_client *cl = osc_con2cl(conn);
+                struct ptlrpc_bulk_desc *bulk;
                char *buf;
                 int rc;
 
+                bulk = ptlrpc_prep_bulk(&cl->cli_server);
+                if (bulk == NULL)
+                        return -ENOMEM;
+
+                spin_lock(&cl->cli_lock);
+                bulk->b_xid = cl->cli_xid++;
+                spin_unlock(&cl->cli_lock);
+
                OBD_ALLOC(buf, src->len);
-               if (!buf)
+               if (!buf) {
+                        OBD_FREE(bulk, sizeof(*bulk));
                        return -ENOMEM;
+                }
 
                 memcpy(buf, (char *)(unsigned long)src->addr, src->len);
 
-                req->rq_type = PTLRPC_BULK;
-                req->rq_bulkbuf = buf;
-                req->rq_bulklen = src->len;
-                rc = ptl_send_buf(req, &req->rq_peer, OST_BULK_PORTAL);
-                init_waitqueue_head(&req->rq_wait_for_bulk);
-                sleep_on(&req->rq_wait_for_bulk);
+                bulk->b_buf = buf;
+                bulk->b_buflen = src->len;
+                /* FIXME: maybe we should add an XID to struct niobuf? */
+                bulk->b_xid = (__u32)(unsigned long)src->page;
+
+               rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
+                if (rc != 0) {
+                        CERROR("send_bulk failed: %d\n", rc);
+                        BUG();
+                        return rc;
+                }
+                wait_event_interruptible(bulk->b_waitq,
+                                         ptlrpc_check_bulk_sent(bulk));
+
+                if (bulk->b_flags == PTL_RPC_INTR) {
+                        EXIT;
+                        /* FIXME: hey hey, we leak here. */
+                        return -EINTR;
+                }
+
+                OBD_FREE(bulk, sizeof(*bulk));
                 OBD_FREE(buf, src->len);
-                req->rq_bulklen = 0; /* FIXME: eek. */
         }
 
         return 0;
 }
 
-
-int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
-             struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-             obd_size *count, obd_off *offset, obd_flag *flags)
+int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
+                 obd_count *oa_bufs, struct page **buf, obd_size *count,
+                 obd_off *offset, obd_flag *flags)
 {
-       struct ptlrpc_request *request;
+       struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_request *request;
+        int pages;
        int rc; 
        struct obd_ioobj ioo;
        struct niobuf src;
        int size1, size2 = 0; 
        void *ptr1, *ptr2;
        int i, j, n;
+        struct ptlrpc_bulk_desc **bulk;
 
        size1 = num_oa * sizeof(ioo); 
+        pages = 0;
        for (i = 0; i < num_oa; i++) { 
                size2 += oa_bufs[i] * sizeof(src);
+                pages += oa_bufs[i];
        }
 
-       request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
+        /* We actually pack a _third_ buffer, with XIDs for bulk pages */
+        size2 += pages * sizeof(__u32);
+       request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
        if (!request) { 
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
        }
+        request->rq_req.ost->cmd = OBD_BRW_READ;
+
+        OBD_ALLOC(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
+        if (bulk == NULL) {
+                CERROR("cannot alloc bulk desc vector\n");
+                return -ENOMEM;
+        }
+        memset(bulk, 0, pages * sizeof(struct ptlrpc_bulk_desc *));
+
+        n = 0;
+        ptr1 = ost_req_buf1(request->rq_req.ost);
+        ptr2 = ost_req_buf2(request->rq_req.ost);
+        for (i = 0; i < num_oa; i++) {
+                ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
+                for (j = 0; j < oa_bufs[i]; j++) {
+                        bulk[n] = ptlrpc_prep_bulk(&cl->cli_server);
+                        if (bulk[n] == NULL) {
+                                CERROR("cannot alloc bulk desc\n");
+                                rc = -ENOMEM;
+                                goto out;
+                        }
+
+                        spin_lock(&cl->cli_lock);
+                        bulk[n]->b_xid = cl->cli_xid++;
+                        spin_unlock(&cl->cli_lock);
+                        bulk[n]->b_buf = kmap(buf[n]);
+                        bulk[n]->b_buflen = PAGE_SIZE;
+                        bulk[n]->b_portal = OST_BULK_PORTAL;
+                        ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n],
+                                        count[n], flags[n]);
+                        n++;
+                }
+        }
+
+        /* This is kinda silly--put the XIDs in the "third" buffer. */
+        for (n = 0; n < pages; n++) {
+                *(__u32 *)ptr2 = bulk[n]->b_xid;
+                ptr2 = (char *)ptr2 + sizeof(__u32);
+
+                rc = ptlrpc_wait_bulk(bulk[n]);
+                if (rc)
+                        goto out;
+        }
+
+        request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+        rc = ptlrpc_queue_wait(cl, request);
+
+ out:
+        /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
+         * abort those bulk listeners. */
+
+        if (request->rq_rephdr)
+                OBD_FREE(request->rq_rephdr, request->rq_replen);
+        n = 0;
+        for (i = 0; i < num_oa; i++) {
+                for (j = 0; j < oa_bufs[i]; j++) {
+                        if (bulk[n] == NULL)
+                                continue;
+                        kunmap(bulk[n]->b_buf);
+                        OBD_FREE(bulk[n], sizeof(struct ptlrpc_bulk_desc));
+                        n++;
+                }
+        }
+
+        OBD_FREE(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
+        ptlrpc_free_req(request);
+        return rc;
+}
+
+int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
+                  obd_count *oa_bufs, struct page **buf, obd_size *count,
+                  obd_off *offset, obd_flag *flags)
+{
+       struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_request *request;
+       struct obd_ioobj ioo;
+       struct niobuf src;
+       int pages, rc, i, j, n, size1, size2 = 0; 
+       void *ptr1, *ptr2;
+
+       size1 = num_oa * sizeof(ioo); 
+        pages = 0;
+       for (i = 0; i < num_oa; i++) { 
+               size2 += oa_bufs[i] * sizeof(src);
+                pages += oa_bufs[i];
+       }
+
+       request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
+       if (!request) { 
+               CERROR("cannot pack req!\n"); 
+               return -ENOMEM;
+       }
+        request->rq_req.ost->cmd = OBD_BRW_WRITE;
 
        n = 0;
-       request->rq_req.ost->cmd = rw;
        ptr1 = ost_req_buf1(request->rq_req.ost);
        ptr2 = ost_req_buf2(request->rq_req.ost);
         for (i = 0; i < num_oa; i++) {
                ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
                 for (j = 0; j < oa_bufs[i]; j++) {
-                       ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
-                                       count[n], flags[n]); 
+                        ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
+                                        count[n], flags[n]);
                        n++;
                }
        }
 
-        request->rq_bulk_portal = OST_BULK_PORTAL;
-       request->rq_replen = 
-               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
-
-       rc = osc_queue_wait(conn, request);
+       request->rq_replen = sizeof(struct ptlrep_hdr) +
+                sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
+       rc = ptlrpc_queue_wait(cl, request);
        if (rc) { 
                EXIT;
                goto out;
        }
 
-#if 0
-       ptr2 = ost_rep_buf2(request->rq_rep.ost); 
-       if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { 
-               CERROR("buffer length wrong\n"); 
-               goto out;
-       }
-
-       if (rw == OBD_BRW_READ)
-               goto out;
+        ptr2 = ost_rep_buf2(request->rq_rep.ost);
+        if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
+                CERROR("buffer length wrong (%d vs. %d)\n",
+                       request->rq_rep.ost->buflen2, n * sizeof(struct niobuf));
+                EXIT;
+                goto out;
+        }
 
         for (i = 0; i < num_oa; i++) {
                 for (j = 0; j < oa_bufs[i]; j++) {
@@ -498,11 +482,22 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                        src.addr = (__u64)(unsigned long)buf[n];
                        src.len = count[n];
                        ost_unpack_niobuf(&ptr2, &dst);
-                       osc_sendpage(request, dst, &src);
+                       osc_sendpage(conn, request, dst, &src);
                        n++;
                }
        }
-#endif
+
+        /* Reuse the request structure for the completion request. */
+        OBD_FREE(request->rq_rephdr, request->rq_replen);
+        request->rq_rephdr = NULL;
+        request->rq_repbuf = NULL;
+       request->rq_reqhdr->opc = OST_BRW_COMPLETE;
+       request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+       rc = ptlrpc_queue_wait(cl, request);
+       if (rc) { 
+               EXIT;
+               goto out;
+       }
 
  out:
        if (request->rq_rephdr)
@@ -515,10 +510,46 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                }
        }
 
-       osc_free_req(request);
+       ptlrpc_free_req(request);
        return 0;
 }
 
+int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
+             struct obdo **oa, obd_count *oa_bufs, struct page **buf,
+             obd_size *count, obd_off *offset, obd_flag *flags)
+{
+        if (rw == OBD_BRW_READ) {
+                return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
+                                    offset, flags);
+        } else {
+                return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
+                                     offset, flags);
+        }
+}
+
+/* mount the file system (secretly) */
+static int osc_setup(struct obd_device *obddev, obd_count len,
+                       void *buf)
+                       
+{
+       struct osc_obd *osc = &obddev->u.osc;
+       struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
+       int rc;
+       int dev = data->ioc_dev;
+        ENTRY;
+
+       rc = ptlrpc_connect_client(dev, "ost", 
+                                  OST_REQUEST_PORTAL, 
+                                  OSC_REPLY_PORTAL,    
+                                  ost_pack_req, 
+                                  ost_unpack_rep,
+                                  &osc->osc_peer); 
+
+        MOD_INC_USE_COUNT;
+        EXIT;
+        return rc;
+} 
+
 static int osc_cleanup(struct obd_device * obddev)
 {
         MOD_DEC_USE_COUNT;
@@ -555,4 +586,3 @@ MODULE_LICENSE("GPL");
 
 module_init(osc_init);
 module_exit(osc_exit);
-