Whamcloud - gitweb
*** empty log message ***
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index 4e128bf..46b2913 100644 (file)
@@ -258,7 +258,7 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
        return 0;
 }
 
-int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
         struct ptlrpc_bulk_desc **bulk_vec = NULL;
         struct ptlrpc_bulk_desc *bulk = NULL;
@@ -267,9 +267,9 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
        int i, j;
        int objcount, niocount;
        char *tmp1, *tmp2, *end2;
-       char *res;
+       char *res = NULL;
        int cmd;
-       struct niobuf *nb, *src, *dst;
+       struct niobuf *nb, *src;
        struct obd_ioobj *ioo;
        struct ost_req *r = req->rq_req.ost;
 
@@ -296,10 +296,10 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                }
        }
 
-        rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
+        rc = ost_pack_rep(NULL, 0, NULL, 0,
                           &req->rq_rephdr, &req->rq_rep,
                           &req->rq_replen, &req->rq_repbuf);
-       if (rc) { 
+       if (rc) {
                CERROR("cannot pack reply\n"); 
                return rc;
        }
@@ -321,93 +321,162 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                 goto out;
        }
 
-        if (cmd == OBD_BRW_WRITE) {
-                /* Setup buffers for the incoming pages, then send the niobufs
-                 * describing those buffers to the OSC. */
-                OBD_ALLOC(bulk_vec,
-                          niocount * sizeof(struct ptlrpc_bulk_desc *));
-                if (bulk_vec == NULL) {
-                        CERROR("cannot alloc bulk desc vector\n");
-                        return -ENOMEM;
+        for (i = 0; i < niocount; i++) {
+                bulk = ptlrpc_prep_bulk(&req->rq_peer);
+                if (bulk == NULL) {
+                        CERROR("cannot alloc bulk desc\n");
+                        rc = -ENOMEM;
+                        goto out;
                 }
-                memset(bulk_vec, 0,
-                       niocount * sizeof(struct ptlrpc_bulk_desc *));
 
-                for (i = 0; i < niocount; i++) {
-                        struct ptlrpc_service *srv =
-                                req->rq_obd->u.ost.ost_service;
-
-                        bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
-                        if (bulk_vec[i] == NULL) {
-                                CERROR("cannot alloc bulk desc\n");
-                                rc = -ENOMEM;
-                                goto out;
-                        }
-
-                        spin_lock(&srv->srv_lock);
-                        bulk_vec[i]->b_xid = srv->srv_xid++;
-                        spin_unlock(&srv->srv_lock);
-
-                       dst = &((struct niobuf *)res)[i];
-                        /* FIXME: we overload ->page with the xid of this buffer
-                         * for the benefit of the remote client */
-                        dst->page =
-                                (void *)(unsigned long)HTON__u64(bulk_vec[i]->b_xid);
-
-                        bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
-                        bulk_vec[i]->b_buflen = PAGE_SIZE;
-                        bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
-                        rc = ptlrpc_register_bulk(bulk_vec[i]);
-                        if (rc)
-                                goto out;
+                src = &((struct niobuf *)tmp2)[i];
+
+                bulk->b_xid = src->xid;
+                bulk->b_buf = (void *)(unsigned long)src->addr;
+                bulk->b_buflen = PAGE_SIZE;
+                rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
+                if (rc) {
+                        EXIT;
+                        goto out;
+                }
+                wait_event_interruptible(bulk->b_waitq,
+                                         ptlrpc_check_bulk_sent(bulk));
+
+                if (bulk->b_flags == PTL_RPC_INTR) {
+                        EXIT;
+                        goto out;
+                }
+
+                OBD_FREE(bulk, sizeof(*bulk));
+                bulk = NULL;
+        }
 
 #if 0
-                        /* Local delivery */
-                       src = &((struct niobuf *)tmp2)[i];
-                       memcpy((void *)(unsigned long)dst->addr, 
-                               (void *)(unsigned long)src->addr, src->len);
+        /* Local delivery */
+        dst = &((struct niobuf *)tmp2)[i];
+        memcpy((void *)(unsigned long)dst->addr,
+               (void *)(unsigned long)src->addr, PAGE_SIZE);
 #endif
-               }
-               barrier();
-       } else {
+        barrier();
+
+ out:
+        if (res != NULL)
+                OBD_FREE(res, sizeof(struct niobuf) * niocount);
+        if (bulk != NULL)
+                OBD_FREE(bulk, sizeof(*bulk));
+        if (bulk_vec != NULL) {
                 for (i = 0; i < niocount; i++) {
-                        bulk = ptlrpc_prep_bulk(&req->rq_peer);
-                        if (bulk == NULL) {
-                                CERROR("cannot alloc bulk desc\n");
-                                rc = -ENOMEM;
-                                goto out;
-                        }
-
-                       src = &((struct niobuf *)tmp2)[i];
-
-                        bulk->b_xid = src->xid;
-                        bulk->b_buf = (void *)(unsigned long)src->addr;
-                        bulk->b_buflen = PAGE_SIZE;
-                        rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
-                        if (rc) {
-                                EXIT;
-                                goto out;
-                        }
-                        wait_event_interruptible(bulk->b_waitq,
-                                                 ptlrpc_check_bulk_sent(bulk));
-
-                        if (bulk->b_flags == PTL_RPC_INTR) {
-                                EXIT;
-                                goto out;
-                        }
-
-                        OBD_FREE(bulk, sizeof(*bulk));
-                        bulk = NULL;
+                        if (bulk_vec[i] != NULL)
+                                OBD_FREE(bulk_vec[i], sizeof(*bulk));
                 }
+                OBD_FREE(bulk_vec,
+                         niocount * sizeof(struct ptlrpc_bulk_desc *));
+        }
+
+       EXIT;
+       return 0;
+}
+
+int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+        struct ptlrpc_bulk_desc **bulk_vec = NULL;
+        struct ptlrpc_bulk_desc *bulk = NULL;
+       struct obd_conn conn; 
+       int rc;
+       int i, j;
+       int objcount, niocount;
+       char *tmp1, *tmp2, *end2;
+       char *res;
+       int cmd;
+       struct niobuf *nb, *dst;
+       struct obd_ioobj *ioo;
+       struct ost_req *r = req->rq_req.ost;
+
+       ENTRY;
+       
+       tmp1 = ost_req_buf1(r);
+       tmp2 = ost_req_buf2(r);
+       end2 = tmp2 + req->rq_req.ost->buflen2;
+       objcount = r->buflen1 / sizeof(*ioo); 
+       niocount = r->buflen2 / sizeof(*nb); 
+       cmd = r->cmd;
+
+       conn.oc_id = req->rq_req.ost->connid;
+       conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+        for (i = 0; i < objcount; i++) {
+               ost_unpack_ioo((void *)&tmp1, &ioo);
+               if (tmp2 + ioo->ioo_bufcnt > end2) { 
+                       rc = -EFAULT;
+                       break; 
+               }
+                for (j = 0; j < ioo->ioo_bufcnt; j++) {
+                       ost_unpack_niobuf((void *)&tmp2, &nb); 
+               }
+       }
+
+        rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
+                          &req->rq_rephdr, &req->rq_rep,
+                          &req->rq_replen, &req->rq_repbuf);
+       if (rc) { 
+               CERROR("cannot pack reply\n"); 
+               return rc;
+       }
+        res = ost_rep_buf2(req->rq_rep.ost);
+
+       /* The unpackers move tmp1 and tmp2, so reset them before using */
+       tmp1 = ost_req_buf1(r);
+       tmp2 = ost_req_buf2(r);
+       req->rq_rep.ost->result = obd_preprw
+               (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
+                niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
+
+       if (req->rq_rep.ost->result) {
+               EXIT;
+                goto out;
+       }
+
+        /* Setup buffers for the incoming pages, then send the niobufs
+         * describing those buffers to the OSC. */
+        OBD_ALLOC(bulk_vec, niocount * sizeof(struct ptlrpc_bulk_desc *));
+        if (bulk_vec == NULL) {
+                CERROR("cannot alloc bulk desc vector\n");
+                return -ENOMEM;
+        }
+        memset(bulk_vec, 0, niocount * sizeof(struct ptlrpc_bulk_desc *));
+
+        for (i = 0; i < niocount; i++) {
+                struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+
+                bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
+                if (bulk_vec[i] == NULL) {
+                        CERROR("cannot alloc bulk desc\n");
+                        rc = -ENOMEM;
+                        goto out;
+                }
+
+                spin_lock(&srv->srv_lock);
+                bulk_vec[i]->b_xid = srv->srv_xid++;
+                spin_unlock(&srv->srv_lock);
+
+                dst = &((struct niobuf *)res)[i];
+                dst->xid = HTON__u32(bulk_vec[i]->b_xid);
+
+                bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
+                bulk_vec[i]->b_buflen = PAGE_SIZE;
+                bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
+                rc = ptlrpc_register_bulk(bulk_vec[i]);
+                if (rc)
+                        goto out;
 
 #if 0
                 /* Local delivery */
-                dst = &((struct niobuf *)tmp2)[i];
-                memcpy((void *)(unsigned long)dst->addr, 
-                       (void *)(unsigned long)src->addr, PAGE_SIZE);
+                src = &((struct niobuf *)tmp2)[i];
+                memcpy((void *)(unsigned long)dst->addr,
+                       (void *)(unsigned long)src->addr, src->len);
 #endif
-               barrier();
-       }
+        }
+        barrier();
 
  out:
         if (bulk != NULL)
@@ -425,6 +494,17 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
        return 0;
 }
 
+int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+       struct ost_req *r = req->rq_req.ost;
+       int cmd = r->cmd;
+
+        if (cmd == OBD_BRW_READ)
+                return ost_brw_read(obddev, req);
+        else
+                return ost_brw_write(obddev, req);
+}
+
 int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
        struct obd_conn conn;