Whamcloud - gitweb
- bug fix to unload modules with safe wait functions
[fs/lustre-release.git] / lustre / ptlrpc / rpc.c
index 82eb71a..3fdc05c 100644 (file)
 #include <linux/obd_support.h>
 #include <linux/lustre_net.h>
 
-static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq;
+static ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq, 
+        bulk_source_eq, bulk_sink_eq;
 
+
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, 
+                                       int opcode, int namelen, char *name,
+                                       int tgtlen, char *tgt)
+{
+       struct ptlrpc_request *request;
+       int rc;
+       ENTRY; 
+
+       OBD_ALLOC(request, sizeof(*request));
+       if (!request) { 
+               CERROR("request allocation out of memory\n");
+               return NULL;
+       }
+
+       memset(request, 0, sizeof(*request));
+       request->rq_xid = cl->cli_xid++;
+
+       rc = cl->cli_req_pack(name, namelen, tgt, tgtlen,
+                         &request->rq_reqhdr, &request->rq_req,
+                         &request->rq_reqlen, &request->rq_reqbuf);
+       if (rc) { 
+               CERROR("cannot pack request %d\n", rc); 
+               return NULL;
+       }
+       request->rq_reqhdr->opc = opcode;
+
+       EXIT;
+       return request;
+}
+
+void ptlrpc_free_req(struct ptlrpc_request *request)
+{
+       OBD_FREE(request, sizeof(*request));
+}
+
+int ptlrpc_queue_wait(struct ptlrpc_request *req, 
+                             struct ptlrpc_client *cl)
+{
+       int rc;
+        DECLARE_WAITQUEUE(wait, current);
+
+       init_waitqueue_head(&req->rq_wait_for_rep);
+
+       if (cl->cli_enqueue) {
+               /* Local delivery */
+                ENTRY;
+               rc = cl->cli_enqueue(req); 
+       } else {
+               /* Remote delivery via portals. */
+               req->rq_req_portal = cl->cli_request_portal;
+               req->rq_reply_portal = cl->cli_reply_portal;
+               rc = ptl_send_rpc(req, &cl->cli_server);
+       }
+       if (rc) { 
+               CERROR("error %d, opcode %d\n", rc, 
+                      req->rq_reqhdr->opc); 
+               return -rc;
+       }
+
+        CDEBUG(0, "-- sleeping\n");
+        add_wait_queue(&req->rq_wait_for_rep, &wait);
+        while (req->rq_repbuf == NULL) {
+                set_current_state(TASK_INTERRUPTIBLE);
+
+                /* if this process really wants to die, let it go */
+                if (sigismember(&(current->pending.signal), SIGKILL) ||
+                    sigismember(&(current->pending.signal), SIGINT))
+                        break;
+
+                schedule();
+        }
+        remove_wait_queue(&req->rq_wait_for_rep, &wait);
+        set_current_state(TASK_RUNNING);
+        CDEBUG(0, "-- done\n");
+
+        if (req->rq_repbuf == NULL) {
+                /* We broke out because of a signal */
+                EXIT;
+                return -EINTR;
+        }
+
+       rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
+       if (rc) {
+               CERROR("unpack_rep failed: %d\n", rc);
+               return rc;
+       }
+
+       if ( req->rq_rephdr->status == 0 )
+                CDEBUG(0, "--> buf %p len %d status %d\n",
+                      req->rq_repbuf, req->rq_replen, 
+                      req->rq_rephdr->status); 
+
+       EXIT;
+       return 0;
+}
 /*
- * 1. Free the request buffer after it has gone out on the wire
- * 2. Wake up the thread waiting for the reply once it comes in.
+ *  Free the packet when it has gone out
  */
-static int client_packet_callback(ptl_event_t *ev, void *data)
+static int sent_packet_callback(ptl_event_t *ev, void *data)
 {
-        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
-
         ENTRY;
-        // XXX make sure we understand all events, including ACK's
 
         if (ev->type == PTL_EVENT_SENT) {
                 OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
-        } else if (ev->type == PTL_EVENT_PUT) {
+        } else { 
+                // XXX make sure we understand all events, including ACK's
+                CERROR("Unknown event %d\n", ev->type); 
+                BUG();
+        }
+
+        EXIT;
+        return 1;
+}
+
+/*
+ * Wake up the thread waiting for the reply once it comes in.
+ */
+static int rcvd_reply_callback(ptl_event_t *ev, void *data)
+{
+        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+        ENTRY;
+
+        if (ev->type == PTL_EVENT_PUT) {
                 rpc->rq_repbuf = ev->mem_desc.start + ev->offset;
                 wake_up_interruptible(&rpc->rq_wait_for_rep);
+        } else { 
+                // XXX make sure we understand all events, including ACK's
+                CERROR("Unknown event %d\n", ev->type); 
+                BUG();
         }
 
         EXIT;
@@ -120,6 +235,7 @@ static int bulk_source_callback(ptl_event_t *ev, void *data)
                 wake_up_interruptible(&rpc->rq_wait_for_bulk);
         } else {
                 CERROR("Unexpected event type!\n");
+                BUG();
         }
 
         EXIT;
@@ -138,6 +254,7 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data)
                 wake_up_interruptible(&rpc->rq_wait_for_bulk);
         } else {
                 CERROR("Unexpected event type!\n");
+                BUG();
         }
 
         EXIT;
@@ -159,11 +276,11 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
         } else if (is_request) {
                 request->rq_req_md.start = request->rq_reqbuf;
                 request->rq_req_md.length = request->rq_reqlen;
-                request->rq_req_md.eventq = req_eq;
+                request->rq_req_md.eventq = sent_pkt_eq;
         } else {
                 request->rq_req_md.start = request->rq_repbuf;
                 request->rq_req_md.length = request->rq_replen;
-                request->rq_req_md.eventq = req_eq;
+                request->rq_req_md.eventq = sent_pkt_eq;
         }
         request->rq_req_md.threshold = 1;
         request->rq_req_md.options = PTL_MD_OP_PUT;
@@ -171,6 +288,7 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
 
         rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
         if (rc != 0) {
+                BUG();
                 CERROR("PtlMDBind failed: %d\n", rc);
                 return rc;
         }
@@ -187,6 +305,7 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
                             request->rq_xid, 0, 0);
         }
         if (rc != PTL_OK) {
+                BUG();
                 CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
                        portal, request->rq_xid, rc);
                 /* FIXME: tear down md */
@@ -222,10 +341,12 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         local_id.gid = PTL_ID_ANY;
         local_id.rid = PTL_ID_ANY;
 
+        CERROR("sending req %d\n", request->rq_xid);
         rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
                          request->rq_xid, 0, PTL_UNLINK, &me_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMEAttach failed: %d\n", rc);
+                BUG();
                 EXIT;
                 goto cleanup;
         }
@@ -235,12 +356,13 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         request->rq_reply_md.threshold = 1;
         request->rq_reply_md.options = PTL_MD_OP_PUT;
         request->rq_reply_md.user_ptr = request;
-        request->rq_reply_md.eventq = req_eq;
+        request->rq_reply_md.eventq = rcvd_rep_eq;
 
         rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
                          &request->rq_reply_md_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMDAttach failed: %d\n", rc);
+                BUG();
                 EXIT;
                 goto cleanup2;
         }
@@ -251,6 +373,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                                  &bulk_me_h);
                 if (rc != PTL_OK) {
                         CERROR("PtlMEAttach failed: %d\n", rc);
+                        BUG();
                         EXIT;
                         goto cleanup3;
                 }
@@ -266,6 +389,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                                  &request->rq_bulk_md_h);
                 if (rc != PTL_OK) {
                         CERROR("PtlMDAttach failed: %d\n", rc);
+                        BUG();
                         EXIT;
                         goto cleanup4;
                 }
@@ -326,7 +450,8 @@ int ptl_received_rpc(struct ptlrpc_service *service) {
 
                 CDEBUG(D_INFO, "Attach MD in ring, rc %d\n", rc);
                 if (rc != PTL_OK) {
-                        /* cleanup */
+                        /* XXX cleanup */
+                        BUG();
                         CERROR("PtlMDAttach failed: %d\n", rc);
                         return rc;
                 }
@@ -453,7 +578,11 @@ static int req_init_portals(void)
         }
         ni = *nip;
 
-        rc = PtlEQAlloc(ni, 128, client_packet_callback, NULL, &req_eq);
+        rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq);
+        if (rc != PTL_OK)
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+
+        rc = PtlEQAlloc(ni, 128, rcvd_reply_callback, NULL, &rcvd_rep_eq);
         if (rc != PTL_OK)
                 CERROR("PtlEQAlloc failed: %d\n", rc);
 
@@ -475,7 +604,8 @@ static int __init ptlrpc_init(void)
 
 static void __exit ptlrpc_exit(void)
 {
-        PtlEQFree(req_eq);
+        PtlEQFree(sent_pkt_eq);
+        PtlEQFree(rcvd_rep_eq);
         PtlEQFree(bulk_source_eq);
         PtlEQFree(bulk_sink_eq);