Whamcloud - gitweb
*** empty log message ***
authorpschwan <pschwan>
Sun, 3 Mar 2002 05:46:50 +0000 (05:46 +0000)
committerpschwan <pschwan>
Sun, 3 Mar 2002 05:46:50 +0000 (05:46 +0000)
lustre/osc/osc_request.c
lustre/ost/ost_handler.c

index 824d36a..1b93efe 100644 (file)
@@ -273,37 +273,23 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
                  struct niobuf *dst, struct niobuf *src)
 {
-        if (conn->oc_id != -1) {
+        struct ptlrpc_client *cl = osc_con2cl(conn);
+
+        if (cl->cli_obd) {
                 /* local sendpage */
                 memcpy((char *)(unsigned long)dst->addr,
                        (char *)(unsigned long)src->addr, src->len);
         } else {
-                struct ptlrpc_client *cl = osc_con2cl(conn);
                 struct ptlrpc_bulk_desc *bulk;
-               char *buf;
                 int rc;
 
                 bulk = ptlrpc_prep_bulk(&cl->cli_server);
                 if (bulk == NULL)
                         return -ENOMEM;
 
-                spin_lock(&cl->cli_lock);
-                bulk->b_xid = cl->cli_xid++;
-                spin_unlock(&cl->cli_lock);
-
-               OBD_ALLOC(buf, src->len);
-               if (!buf) {
-                        OBD_FREE(bulk, sizeof(*bulk));
-                       return -ENOMEM;
-                }
-
-                memcpy(buf, (char *)(unsigned long)src->addr, src->len);
-
-                bulk->b_buf = buf;
+                bulk->b_buf = (void *)(unsigned long)src->addr;
                 bulk->b_buflen = src->len;
-                /* FIXME: maybe we should add an XID to struct niobuf? */
-                bulk->b_xid = (__u32)(unsigned long)src->page;
-
+                bulk->b_xid = dst->xid;
                rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
                 if (rc != 0) {
                         CERROR("send_bulk failed: %d\n", rc);
@@ -320,7 +306,6 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
                 }
 
                 OBD_FREE(bulk, sizeof(*bulk));
-                OBD_FREE(buf, src->len);
         }
 
         return 0;
@@ -429,11 +414,11 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                   obd_off *offset, obd_flag *flags)
 {
        struct ptlrpc_client *cl = osc_con2cl(conn);
-        struct ptlrpc_request *request;
+        struct ptlrpc_request *request, *req2 = NULL;
        struct obd_ioobj ioo;
        struct niobuf src;
        int pages, rc, i, j, n, size1, size2 = 0; 
-       void *ptr1, *ptr2;
+       void *ptr1, *ptr2, *reqbuf;
 
        size1 = num_oa * sizeof(ioo); 
         pages = 0;
@@ -447,6 +432,11 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                CERROR("cannot pack req!\n"); 
                return -ENOMEM;
        }
+        OBD_ALLOC(reqbuf, request->rq_reqlen);
+        if (reqbuf == NULL) {
+                CERROR("cannot make duplicate buffer\n");
+                return -ENOMEM;
+        }
         request->rq_req.ost->cmd = OBD_BRW_WRITE;
 
        n = 0;
@@ -461,6 +451,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                }
        }
 
+        memcpy(reqbuf, request->rq_reqbuf, request->rq_reqlen);
        request->rq_replen = sizeof(struct ptlrep_hdr) +
                 sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
        rc = ptlrpc_queue_wait(cl, request);
@@ -477,6 +468,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                 goto out;
         }
 
+        n = 0;
         for (i = 0; i < num_oa; i++) {
                 for (j = 0; j < oa_bufs[i]; j++) {
                        struct niobuf *dst;
@@ -488,13 +480,17 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                }
        }
 
-        /* Reuse the request structure for the completion request. */
-        OBD_FREE(request->rq_rephdr, request->rq_replen);
-        request->rq_rephdr = NULL;
-        request->rq_repbuf = NULL;
-       request->rq_reqhdr->opc = OST_BRW_COMPLETE;
-       request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
-       rc = ptlrpc_queue_wait(cl, request);
+        ptr2 = ost_rep_buf2(request->rq_rep.ost);
+       req2 = ptlrpc_prep_req(cl, OST_BRW_COMPLETE, size1, ptr1,
+                               request->rq_rep.ost->buflen2, ptr2);
+       if (!req2) { 
+               CERROR("cannot pack second request!\n"); 
+               return -ENOMEM;
+       }
+
+       req2->rq_reqhdr->opc = OST_BRW_COMPLETE;
+       req2->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+       rc = ptlrpc_queue_wait(cl, req2);
        if (rc) { 
                EXIT;
                goto out;
@@ -503,6 +499,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
  out:
        if (request->rq_rephdr)
                OBD_FREE(request->rq_rephdr, request->rq_replen);
+       if (req2 && req2->rq_rephdr)
+               OBD_FREE(req2->rq_rephdr, req2->rq_replen);
        n = 0;
         for (i = 0; i < num_oa; i++) {
                 for (j = 0; j < oa_bufs[i]; j++) {
@@ -511,6 +509,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                }
        }
 
+        if (req2)
+                ptlrpc_free_req(req2);
        ptlrpc_free_req(request);
        return 0;
 }
index 4e128bf..46b2913 100644 (file)
@@ -258,7 +258,7 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
        return 0;
 }
 
-int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
         struct ptlrpc_bulk_desc **bulk_vec = NULL;
         struct ptlrpc_bulk_desc *bulk = NULL;
@@ -267,9 +267,9 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
        int i, j;
        int objcount, niocount;
        char *tmp1, *tmp2, *end2;
-       char *res;
+       char *res = NULL;
        int cmd;
-       struct niobuf *nb, *src, *dst;
+       struct niobuf *nb, *src;
        struct obd_ioobj *ioo;
        struct ost_req *r = req->rq_req.ost;
 
@@ -296,10 +296,10 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                }
        }
 
-        rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
+        rc = ost_pack_rep(NULL, 0, NULL, 0,
                           &req->rq_rephdr, &req->rq_rep,
                           &req->rq_replen, &req->rq_repbuf);
-       if (rc) { 
+       if (rc) {
                CERROR("cannot pack reply\n"); 
                return rc;
        }
@@ -321,93 +321,162 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
                 goto out;
        }
 
-        if (cmd == OBD_BRW_WRITE) {
-                /* Setup buffers for the incoming pages, then send the niobufs
-                 * describing those buffers to the OSC. */
-                OBD_ALLOC(bulk_vec,
-                          niocount * sizeof(struct ptlrpc_bulk_desc *));
-                if (bulk_vec == NULL) {
-                        CERROR("cannot alloc bulk desc vector\n");
-                        return -ENOMEM;
+        for (i = 0; i < niocount; i++) {
+                bulk = ptlrpc_prep_bulk(&req->rq_peer);
+                if (bulk == NULL) {
+                        CERROR("cannot alloc bulk desc\n");
+                        rc = -ENOMEM;
+                        goto out;
                 }
-                memset(bulk_vec, 0,
-                       niocount * sizeof(struct ptlrpc_bulk_desc *));
 
-                for (i = 0; i < niocount; i++) {
-                        struct ptlrpc_service *srv =
-                                req->rq_obd->u.ost.ost_service;
-
-                        bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
-                        if (bulk_vec[i] == NULL) {
-                                CERROR("cannot alloc bulk desc\n");
-                                rc = -ENOMEM;
-                                goto out;
-                        }
-
-                        spin_lock(&srv->srv_lock);
-                        bulk_vec[i]->b_xid = srv->srv_xid++;
-                        spin_unlock(&srv->srv_lock);
-
-                       dst = &((struct niobuf *)res)[i];
-                        /* FIXME: we overload ->page with the xid of this buffer
-                         * for the benefit of the remote client */
-                        dst->page =
-                                (void *)(unsigned long)HTON__u64(bulk_vec[i]->b_xid);
-
-                        bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
-                        bulk_vec[i]->b_buflen = PAGE_SIZE;
-                        bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
-                        rc = ptlrpc_register_bulk(bulk_vec[i]);
-                        if (rc)
-                                goto out;
+                src = &((struct niobuf *)tmp2)[i];
+
+                bulk->b_xid = src->xid;
+                bulk->b_buf = (void *)(unsigned long)src->addr;
+                bulk->b_buflen = PAGE_SIZE;
+                rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
+                if (rc) {
+                        EXIT;
+                        goto out;
+                }
+                wait_event_interruptible(bulk->b_waitq,
+                                         ptlrpc_check_bulk_sent(bulk));
+
+                if (bulk->b_flags == PTL_RPC_INTR) {
+                        EXIT;
+                        goto out;
+                }
+
+                OBD_FREE(bulk, sizeof(*bulk));
+                bulk = NULL;
+        }
 
 #if 0
-                        /* Local delivery */
-                       src = &((struct niobuf *)tmp2)[i];
-                       memcpy((void *)(unsigned long)dst->addr, 
-                               (void *)(unsigned long)src->addr, src->len);
+        /* Local delivery */
+        dst = &((struct niobuf *)tmp2)[i];
+        memcpy((void *)(unsigned long)dst->addr,
+               (void *)(unsigned long)src->addr, PAGE_SIZE);
 #endif
-               }
-               barrier();
-       } else {
+        barrier();
+
+ out:
+        if (res != NULL)
+                OBD_FREE(res, sizeof(struct niobuf) * niocount);
+        if (bulk != NULL)
+                OBD_FREE(bulk, sizeof(*bulk));
+        if (bulk_vec != NULL) {
                 for (i = 0; i < niocount; i++) {
-                        bulk = ptlrpc_prep_bulk(&req->rq_peer);
-                        if (bulk == NULL) {
-                                CERROR("cannot alloc bulk desc\n");
-                                rc = -ENOMEM;
-                                goto out;
-                        }
-
-                       src = &((struct niobuf *)tmp2)[i];
-
-                        bulk->b_xid = src->xid;
-                        bulk->b_buf = (void *)(unsigned long)src->addr;
-                        bulk->b_buflen = PAGE_SIZE;
-                        rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
-                        if (rc) {
-                                EXIT;
-                                goto out;
-                        }
-                        wait_event_interruptible(bulk->b_waitq,
-                                                 ptlrpc_check_bulk_sent(bulk));
-
-                        if (bulk->b_flags == PTL_RPC_INTR) {
-                                EXIT;
-                                goto out;
-                        }
-
-                        OBD_FREE(bulk, sizeof(*bulk));
-                        bulk = NULL;
+                        if (bulk_vec[i] != NULL)
+                                OBD_FREE(bulk_vec[i], sizeof(*bulk));
                 }
+                OBD_FREE(bulk_vec,
+                         niocount * sizeof(struct ptlrpc_bulk_desc *));
+        }
+
+       EXIT;
+       return 0;
+}
+
+int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+        struct ptlrpc_bulk_desc **bulk_vec = NULL;
+        struct ptlrpc_bulk_desc *bulk = NULL;
+       struct obd_conn conn; 
+       int rc;
+       int i, j;
+       int objcount, niocount;
+       char *tmp1, *tmp2, *end2;
+       char *res;
+       int cmd;
+       struct niobuf *nb, *dst;
+       struct obd_ioobj *ioo;
+       struct ost_req *r = req->rq_req.ost;
+
+       ENTRY;
+       
+       tmp1 = ost_req_buf1(r);
+       tmp2 = ost_req_buf2(r);
+       end2 = tmp2 + req->rq_req.ost->buflen2;
+       objcount = r->buflen1 / sizeof(*ioo); 
+       niocount = r->buflen2 / sizeof(*nb); 
+       cmd = r->cmd;
+
+       conn.oc_id = req->rq_req.ost->connid;
+       conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+        for (i = 0; i < objcount; i++) {
+               ost_unpack_ioo((void *)&tmp1, &ioo);
+               if (tmp2 + ioo->ioo_bufcnt > end2) { 
+                       rc = -EFAULT;
+                       break; 
+               }
+                for (j = 0; j < ioo->ioo_bufcnt; j++) {
+                       ost_unpack_niobuf((void *)&tmp2, &nb); 
+               }
+       }
+
+        rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
+                          &req->rq_rephdr, &req->rq_rep,
+                          &req->rq_replen, &req->rq_repbuf);
+       if (rc) { 
+               CERROR("cannot pack reply\n"); 
+               return rc;
+       }
+        res = ost_rep_buf2(req->rq_rep.ost);
+
+       /* The unpackers move tmp1 and tmp2, so reset them before using */
+       tmp1 = ost_req_buf1(r);
+       tmp2 = ost_req_buf2(r);
+       req->rq_rep.ost->result = obd_preprw
+               (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
+                niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
+
+       if (req->rq_rep.ost->result) {
+               EXIT;
+                goto out;
+       }
+
+        /* Setup buffers for the incoming pages, then send the niobufs
+         * describing those buffers to the OSC. */
+        OBD_ALLOC(bulk_vec, niocount * sizeof(struct ptlrpc_bulk_desc *));
+        if (bulk_vec == NULL) {
+                CERROR("cannot alloc bulk desc vector\n");
+                return -ENOMEM;
+        }
+        memset(bulk_vec, 0, niocount * sizeof(struct ptlrpc_bulk_desc *));
+
+        for (i = 0; i < niocount; i++) {
+                struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+
+                bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
+                if (bulk_vec[i] == NULL) {
+                        CERROR("cannot alloc bulk desc\n");
+                        rc = -ENOMEM;
+                        goto out;
+                }
+
+                spin_lock(&srv->srv_lock);
+                bulk_vec[i]->b_xid = srv->srv_xid++;
+                spin_unlock(&srv->srv_lock);
+
+                dst = &((struct niobuf *)res)[i];
+                dst->xid = HTON__u32(bulk_vec[i]->b_xid);
+
+                bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
+                bulk_vec[i]->b_buflen = PAGE_SIZE;
+                bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
+                rc = ptlrpc_register_bulk(bulk_vec[i]);
+                if (rc)
+                        goto out;
 
 #if 0
                 /* Local delivery */
-                dst = &((struct niobuf *)tmp2)[i];
-                memcpy((void *)(unsigned long)dst->addr, 
-                       (void *)(unsigned long)src->addr, PAGE_SIZE);
+                src = &((struct niobuf *)tmp2)[i];
+                memcpy((void *)(unsigned long)dst->addr,
+                       (void *)(unsigned long)src->addr, src->len);
 #endif
-               barrier();
-       }
+        }
+        barrier();
 
  out:
         if (bulk != NULL)
@@ -425,6 +494,17 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
        return 0;
 }
 
+int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+       struct ost_req *r = req->rq_req.ost;
+       int cmd = r->cmd;
+
+        if (cmd == OBD_BRW_READ)
+                return ost_brw_read(obddev, req);
+        else
+                return ost_brw_write(obddev, req);
+}
+
 int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
        struct obd_conn conn;