Whamcloud - gitweb
OSC/OST bulk writing. TOTALLY UNTESTED. I am about to merge this branch back
authorpschwan <pschwan>
Sun, 3 Mar 2002 02:24:49 +0000 (02:24 +0000)
committerpschwan <pschwan>
Sun, 3 Mar 2002 02:24:49 +0000 (02:24 +0000)
to the tip, also, which is going to break the tip.  You have been warned.

lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/service.c

index e67378e..8fc5669 100644 (file)
@@ -72,6 +72,7 @@ int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal,
         int err; 
 
         memset(cl, 0, sizeof(*cl));
+        spin_lock_init(&cl->cli_lock);
        cl->cli_xid = 1;
        cl->cli_obd = NULL; 
        cl->cli_request_portal = req_portal;
@@ -106,6 +107,20 @@ int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal,
         return err;
 }
 
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer)
+{
+        struct ptlrpc_bulk_desc *bulk;
+
+        OBD_ALLOC(bulk, sizeof(*bulk));
+        if (bulk != NULL) {
+                memset(bulk, 0, sizeof(*bulk));
+                memcpy(&bulk->b_peer, peer, sizeof(*peer));
+                init_waitqueue_head(&bulk->b_waitq);
+        }
+
+        return bulk;
+}
+
 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, 
                                        int opcode, int namelen, char *name,
                                        int tgtlen, char *tgt)
@@ -121,7 +136,10 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
        }
 
        memset(request, 0, sizeof(*request));
+
+        spin_lock(&cl->cli_lock);
        request->rq_xid = cl->cli_xid++;
+        spin_unlock(&cl->cli_lock);
 
        rc = cl->cli_req_pack(name, namelen, tgt, tgtlen,
                          &request->rq_reqhdr, &request->rq_req,
@@ -157,7 +175,6 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
                 return 1;
         }
 
-        CERROR("no event yet\n"); 
         return 0;
 }
 
@@ -172,13 +189,6 @@ int ptlrpc_abort(struct ptlrpc_request *request)
         request->rq_repbuf = NULL;
         request->rq_replen = 0;
 
-        if (request->rq_bulklen != 0) {
-                PtlMEUnlink(request->rq_bulk_me_h);
-                PtlMDUnlink(request->rq_bulk_md_h);
-                /* FIXME: wake whoever's sleeping on this bulk sending to let
-                 * -them- clean it up. */
-        }
-
         return 0;
 }
 
@@ -201,14 +211,12 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
                rc = ptl_send_rpc(req, &cl->cli_server);
        }
        if (rc) { 
-               CERROR("error %d, opcode %d\n", rc, 
-                      req->rq_reqhdr->opc); 
+                CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
                return -rc;
        }
 
         CDEBUG(0, "-- sleeping\n");
-        wait_event_interruptible(req->rq_wait_for_rep, 
-                                 ptlrpc_check_reply(req));
+        wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
         CDEBUG(0, "-- done\n");
         
         if (req->rq_flags == PTL_RPC_INTR) { 
@@ -224,7 +232,8 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
                 return -EINTR;
         }
 
-       rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
+       rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen,
+                                &req->rq_rephdr, &req->rq_rep);
        if (rc) {
                CERROR("unpack_rep failed: %d\n", rc);
                return rc;
@@ -232,9 +241,8 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
         CERROR("got rep %d\n", req->rq_rephdr->xid);
 
        if ( req->rq_rephdr->status == 0 )
-                CDEBUG(0, "--> buf %p len %d status %d\n",
-                      req->rq_repbuf, req->rq_replen, 
-                      req->rq_rephdr->status); 
+                CDEBUG(0, "--> buf %p len %d status %d\n", req->rq_repbuf,
+                       req->rq_replen, req->rq_rephdr->status);
 
        EXIT;
        return 0;
index c39d093..f3cfd1c 100644 (file)
@@ -33,6 +33,7 @@
 #include <linux/lustre_net.h>
 
 ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq, bulk_source_eq, bulk_sink_eq;
+static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL;
 
 /*
  *  Free the packet when it has gone out
@@ -131,7 +132,7 @@ int server_request_callback(ptl_event_t *ev, void *data)
 
 static int bulk_source_callback(ptl_event_t *ev, void *data)
 {
-        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr;
 
         ENTRY;
 
@@ -139,8 +140,8 @@ static int bulk_source_callback(ptl_event_t *ev, void *data)
                 CDEBUG(D_NET, "got SENT event\n");
         } else if (ev->type == PTL_EVENT_ACK) {
                 CDEBUG(D_NET, "got ACK event\n");
-                rpc->rq_bulkbuf = NULL;
-                wake_up_interruptible(&rpc->rq_wait_for_bulk);
+                bulk->b_flags = PTL_BULK_SENT;
+                wake_up_interruptible(&bulk->b_waitq);
         } else {
                 CERROR("Unexpected event type!\n");
                 BUG();
@@ -152,14 +153,15 @@ static int bulk_source_callback(ptl_event_t *ev, void *data)
 
 static int bulk_sink_callback(ptl_event_t *ev, void *data)
 {
-        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr;
 
         ENTRY;
 
         if (ev->type == PTL_EVENT_PUT) {
-                if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
+                if (bulk->b_buf != ev->mem_desc.start + ev->offset)
                         CERROR("bulkbuf != mem_desc -- why?\n");
-                //wake_up_interruptible(&rpc->rq_wait_for_bulk);
+                bulk->b_flags = PTL_BULK_RCVD;
+                wake_up_interruptible(&bulk->b_waitq);
         } else {
                 CERROR("Unexpected event type!\n");
                 BUG();
@@ -172,15 +174,20 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data)
 int ptlrpc_init_portals(void)
 {
         int rc;
-        const ptl_handle_ni_t *nip;
         ptl_handle_ni_t ni;
 
-        nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL);
-        if (nip == NULL) {
-                CERROR("get_ni failed: is the NAL module loaded?\n");
+        socknal_nip = inter_module_get_request("ksocknal_ni", "ksocknal");
+        qswnal_nip = inter_module_get_request("kqswnal_ni", "kqswnal");
+        if (socknal_nip == NULL && qswnal_nip == NULL) {
+                CERROR("get_ni failed: is a NAL module loaded?\n");
                 return -EIO;
         }
-        ni = *nip;
+
+        /* Use the qswnal if it's there */
+        if (qswnal_nip != NULL)
+                ni = *qswnal_nip;
+        else
+                ni = *socknal_nip;
 
         rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq);
         if (rc != PTL_OK)
@@ -208,5 +215,8 @@ void ptlrpc_exit_portals(void)
         PtlEQFree(bulk_source_eq);
         PtlEQFree(bulk_sink_eq);
 
-        inter_module_put(LUSTRE_NAL "_ni");
+        if (qswnal_nip != NULL)
+                inter_module_put("kqswnal_ni");
+        if (socknal_nip != NULL)
+                inter_module_put("ksocknal_ni");
 }
index d391f9c..33a4900 100644 (file)
 #include <linux/lustre_net.h>
 
 extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq;
+static ptl_process_id_t local_id = {PTL_ADDR_GID, PTL_ID_ANY, PTL_ID_ANY};
 
 
+int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+{
+        if (bulk->b_flags == PTL_BULK_SENT) {
+                EXIT;
+                return 1;
+        }
+
+        if (sigismember(&(current->pending.signal), SIGKILL) ||
+            sigismember(&(current->pending.signal), SIGINT)) {
+                bulk->b_flags = PTL_RPC_INTR;
+                EXIT;
+                return 1;
+        }
+
+        CERROR("no event yet\n");
+        return 0;
+}
 
 int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
                  int portal)
@@ -68,6 +86,7 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
                 break;
         default:
                 BUG();
+                return -1; /* notreached */
         }
         request->rq_req_md.options = PTL_MD_OP_PUT;
         request->rq_req_md.user_ptr = request;
@@ -97,6 +116,85 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
         return rc;
 }
 
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal)
+{
+        int rc;
+        ptl_process_id_t remote_id;
+        ptl_handle_md_t md_h;
+
+        bulk->b_md.start = bulk->b_buf;
+        bulk->b_md.length = bulk->b_buflen;
+        bulk->b_md.eventq = bulk_source_eq;
+        bulk->b_md.threshold = 2; /* SENT and ACK events */
+        bulk->b_md.options = PTL_MD_OP_PUT;
+        bulk->b_md.user_ptr = bulk;
+
+        rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &md_h);
+        if (rc != 0) {
+                BUG();
+                CERROR("PtlMDBind failed: %d\n", rc);
+                return rc;
+        }
+
+        remote_id.addr_kind = PTL_ADDR_NID;
+        remote_id.nid = bulk->b_peer.peer_nid;
+        remote_id.pid = 0;
+
+        CERROR("Sending %d bytes to portal %d, xid %d\n",
+               bulk->b_md.length, portal, bulk->b_xid);
+
+        rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, bulk->b_xid, 0, 0);
+        if (rc != PTL_OK) {
+                BUG();
+                CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+                       portal, bulk->b_xid, rc);
+                /* FIXME: tear down md */
+        }
+
+        return rc;
+}
+
+int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *bulk)
+{
+        int rc;
+
+        ENTRY;
+
+        rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
+                          bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h);
+        if (rc != PTL_OK) {
+                CERROR("PtlMEAttach failed: %d\n", rc);
+                BUG();
+                EXIT;
+                goto cleanup1;
+        }
+
+        bulk->b_md.start = bulk->b_buf;
+        bulk->b_md.length = bulk->b_buflen;
+        bulk->b_md.threshold = 1;
+        bulk->b_md.options = PTL_MD_OP_PUT;
+        bulk->b_md.user_ptr = bulk;
+        bulk->b_md.eventq = bulk_sink_eq;
+
+        rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h);
+        if (rc != PTL_OK) {
+                CERROR("PtlMDAttach failed: %d\n", rc);
+                BUG();
+                EXIT;
+                goto cleanup2;
+        }
+
+        CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n",
+               bulk->b_buflen, bulk->b_xid, bulk->b_portal);
+
+ cleanup2:
+        PtlMEUnlink(bulk->b_me_h);
+ cleanup1:
+        PtlMDUnlink(bulk->b_md_h);
+
+        return rc;
+}
+
 int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
                  struct ptlrpc_request *req)
 {
@@ -162,7 +260,6 @@ int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
        return ptlrpc_reply(obddev, svc, req);
 }
 
-
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
 {
         ptl_process_id_t local_id;
@@ -190,9 +287,9 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         local_id.rid = PTL_ID_ANY;
 
         //CERROR("sending req %d\n", request->rq_xid);
-        rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
-                         request->rq_xid, 0, PTL_UNLINK,
-                         &request->rq_reply_me_h);
+        rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id,
+                          request->rq_xid, 0, PTL_UNLINK,
+                          &request->rq_reply_me_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMEAttach failed: %d\n", rc);
                 BUG();
@@ -217,40 +314,11 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                 goto cleanup2;
         }
 
-        if (request->rq_bulklen != 0) {
-                rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
-                                 local_id, request->rq_xid, 0, PTL_UNLINK,
-                                 &request->rq_bulk_me_h);
-                if (rc != PTL_OK) {
-                        CERROR("PtlMEAttach failed: %d\n", rc);
-                        BUG();
-                        EXIT;
-                        goto cleanup3;
-                }
-
-                request->rq_bulk_md.start = request->rq_bulkbuf;
-                request->rq_bulk_md.length = request->rq_bulklen;
-                request->rq_bulk_md.threshold = 1;
-                request->rq_bulk_md.options = PTL_MD_OP_PUT;
-                request->rq_bulk_md.user_ptr = request;
-                request->rq_bulk_md.eventq = bulk_sink_eq;
-
-                rc = PtlMDAttach(request->rq_bulk_me_h, request->rq_bulk_md,
-                                 PTL_UNLINK, &request->rq_bulk_md_h);
-                if (rc != PTL_OK) {
-                        CERROR("PtlMDAttach failed: %d\n", rc);
-                        BUG();
-                        EXIT;
-                        goto cleanup4;
-                }
-        }
+        CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n",
+               request->rq_replen, request->rq_xid, request->rq_reply_portal);
 
         return ptl_send_buf(request, peer, request->rq_req_portal);
 
- cleanup4:
-        PtlMEUnlink(request->rq_bulk_me_h);
- cleanup3:
-        PtlMDUnlink(request->rq_reply_md_h);
  cleanup2:
         PtlMEUnlink(request->rq_reply_me_h);
  cleanup:
index 3767eda..f560a41 100644 (file)
@@ -37,10 +37,8 @@ extern int server_request_callback(ptl_event_t *ev, void *data);
 static int ptlrpc_check_event(struct ptlrpc_service *svc)
 {
         
-        if (sigismember(&(current->pending.signal),
-                        SIGKILL) ||
-            sigismember(&(current->pending.signal),
-                        SIGINT)) { 
+        if (sigismember(&(current->pending.signal), SIGKILL) ||
+            sigismember(&(current->pending.signal), SIGINT)) { 
                 svc->srv_flags |= SVC_KILLED;
                 EXIT;
                 return 1;
@@ -80,14 +78,9 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc)
         return 0;
 }
 
-struct ptlrpc_service *ptlrpc_init_svc(__u32 bufsize, 
-                                       int req_portal, 
-                                       int rep_portal, 
-                                       char *uuid, 
-                                       req_unpack_t unpack, 
-                                       rep_pack_t pack,
-                                       svc_handler_t handler
-                                       )
+struct ptlrpc_service *
+ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
+                req_unpack_t unpack, rep_pack_t pack, svc_handler_t handler)
 {
         int err;
         struct ptlrpc_service *svc;
@@ -215,7 +208,8 @@ void ptlrpc_stop_thread(struct ptlrpc_service *svc)
        svc->srv_flags = SVC_STOPPING;
 
         wake_up(&svc->srv_waitq);
-        wait_event_interruptible(svc->srv_ctl_waitq,  (svc->srv_flags & SVC_STOPPED));
+        wait_event_interruptible(svc->srv_ctl_waitq, 
+                                 (svc->srv_flags & SVC_STOPPED));
 }
 
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
@@ -336,8 +330,10 @@ int rpc_unregister_service(struct ptlrpc_service *service)
                 rc = PtlMEUnlink(service->srv_me_h[i]);
                 if (rc)
                         CERROR("PtlMEUnlink failed: %d\n", rc);
-        
-                OBD_FREE(service->srv_buf[i], service->srv_buf_size);                
+
+                if (service->srv_buf[i] != NULL)
+                        OBD_FREE(service->srv_buf[i], service->srv_buf_size);
+                service->srv_buf[i] = NULL;
         }
 
         rc = PtlEQFree(service->srv_eq_h);