Whamcloud - gitweb
Merged branch 'peter' with the tip. Pre-merge tag is 't_20020302_networking'.
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
index f019560..33a4900 100644 (file)
@@ -1,4 +1,3 @@
-
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
 #include <linux/lustre_net.h>
 
 extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq;
+static ptl_process_id_t local_id = {PTL_ADDR_GID, PTL_ID_ANY, PTL_ID_ANY};
+
+
+int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+{
+        if (bulk->b_flags == PTL_BULK_SENT) {
+                EXIT;
+                return 1;
+        }
+
+        if (sigismember(&(current->pending.signal), SIGKILL) ||
+            sigismember(&(current->pending.signal), SIGINT)) {
+                bulk->b_flags = PTL_RPC_INTR;
+                EXIT;
+                return 1;
+        }
+
+        CERROR("no event yet\n");
+        return 0;
+}
 
 int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
-                 int portal, int is_request)
+                 int portal)
 {
         int rc;
         ptl_process_id_t remote_id;
         ptl_handle_md_t md_h;
+        ptl_ack_req_t ack;
 
-        /* FIXME: This is bad. */
-        if (request->rq_bulklen) {
+        switch (request->rq_type) {
+        case PTL_RPC_BULK:
                 request->rq_req_md.start = request->rq_bulkbuf;
                 request->rq_req_md.length = request->rq_bulklen;
                 request->rq_req_md.eventq = bulk_source_eq;
-        } else if (is_request) {
+                request->rq_req_md.threshold = 2; /* SENT and ACK events */
+                ack = PTL_ACK_REQ;
+                break;
+        case PTL_RPC_REQUEST:
                 request->rq_req_md.start = request->rq_reqbuf;
                 request->rq_req_md.length = request->rq_reqlen;
                 request->rq_req_md.eventq = sent_pkt_eq;
-        } else {
+                request->rq_req_md.threshold = 1;
+                ack = PTL_NOACK_REQ;
+                break;
+        case PTL_RPC_REPLY:
                 request->rq_req_md.start = request->rq_repbuf;
                 request->rq_req_md.length = request->rq_replen;
                 request->rq_req_md.eventq = sent_pkt_eq;
+                request->rq_req_md.threshold = 1;
+                ack = PTL_NOACK_REQ;
+                break;
+        default:
+                BUG();
+                return -1; /* notreached */
         }
-        request->rq_req_md.threshold = 1;
         request->rq_req_md.options = PTL_MD_OP_PUT;
         request->rq_req_md.user_ptr = request;
 
@@ -71,13 +102,10 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
         remote_id.nid = peer->peer_nid;
         remote_id.pid = 0;
 
-        if (request->rq_bulklen) {
-                rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
-                            request->rq_xid, 0, 0);
-        } else {
-                rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
-                            request->rq_xid, 0, 0);
-        }
+        CERROR("Sending %d bytes to portal %d, xid %d\n",
+               request->rq_req_md.length, portal, request->rq_xid);
+
+        rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0);
         if (rc != PTL_OK) {
                 BUG();
                 CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
@@ -88,9 +116,152 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
         return rc;
 }
 
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal)
+{
+        int rc;
+        ptl_process_id_t remote_id;
+        ptl_handle_md_t md_h;
+
+        bulk->b_md.start = bulk->b_buf;
+        bulk->b_md.length = bulk->b_buflen;
+        bulk->b_md.eventq = bulk_source_eq;
+        bulk->b_md.threshold = 2; /* SENT and ACK events */
+        bulk->b_md.options = PTL_MD_OP_PUT;
+        bulk->b_md.user_ptr = bulk;
+
+        rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &md_h);
+        if (rc != 0) {
+                BUG();
+                CERROR("PtlMDBind failed: %d\n", rc);
+                return rc;
+        }
+
+        remote_id.addr_kind = PTL_ADDR_NID;
+        remote_id.nid = bulk->b_peer.peer_nid;
+        remote_id.pid = 0;
+
+        CERROR("Sending %d bytes to portal %d, xid %d\n",
+               bulk->b_md.length, portal, bulk->b_xid);
+
+        rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, bulk->b_xid, 0, 0);
+        if (rc != PTL_OK) {
+                BUG();
+                CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+                       portal, bulk->b_xid, rc);
+                /* FIXME: tear down md */
+        }
+
+        return rc;
+}
+
+int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *bulk)
+{
+        int rc;
+
+        ENTRY;
+
+        rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
+                          bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h);
+        if (rc != PTL_OK) {
+                CERROR("PtlMEAttach failed: %d\n", rc);
+                BUG();
+                EXIT;
+                goto cleanup1;
+        }
+
+        bulk->b_md.start = bulk->b_buf;
+        bulk->b_md.length = bulk->b_buflen;
+        bulk->b_md.threshold = 1;
+        bulk->b_md.options = PTL_MD_OP_PUT;
+        bulk->b_md.user_ptr = bulk;
+        bulk->b_md.eventq = bulk_sink_eq;
+
+        rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h);
+        if (rc != PTL_OK) {
+                CERROR("PtlMDAttach failed: %d\n", rc);
+                BUG();
+                EXIT;
+                goto cleanup2;
+        }
+
+        CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n",
+               bulk->b_buflen, bulk->b_xid, bulk->b_portal);
+
+ cleanup2:
+        PtlMEUnlink(bulk->b_me_h);
+ cleanup1:
+        PtlMDUnlink(bulk->b_md_h);
+
+        return rc;
+}
+
+int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
+                 struct ptlrpc_request *req)
+{
+       struct ptlrpc_request *clnt_req = req->rq_reply_handle;
+       ENTRY;
+
+       if (req->rq_reply_handle == NULL) {
+               /* This is a request that came from the network via portals. */
+
+               /* FIXME: we need to increment the count of handled events */
+                req->rq_type = PTL_RPC_REPLY;
+                req->rq_reqhdr->xid = req->rq_reqhdr->xid;
+               ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal);
+       } else {
+               /* This is a local request that came from another thread. */
+
+               /* move the reply to the client */ 
+               clnt_req->rq_replen = req->rq_replen;
+               clnt_req->rq_repbuf = req->rq_repbuf;
+               req->rq_repbuf = NULL;
+               req->rq_replen = 0;
+
+               /* free the request buffer */
+               OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
+               req->rq_reqbuf = NULL;
+
+               /* wake up the client */ 
+               wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
+       }
+
+       EXIT;
+       return 0;
+}
+
+int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
+                 struct ptlrpc_request *req)
+{
+       struct ptlrep_hdr *hdr;
+
+       ENTRY;
+
+       OBD_ALLOC(hdr, sizeof(*hdr));
+       if (!hdr) { 
+               EXIT;
+               return -ENOMEM;
+       }
+
+       memset(hdr, 0, sizeof(*hdr));
+       
+       hdr->xid = req->rq_reqhdr->xid;
+       hdr->status = req->rq_status; 
+       hdr->type = OST_TYPE_ERR;
+
+        if (req->rq_repbuf) { 
+                CERROR("req has repbuf\n");
+                BUG();
+        }
+
+       req->rq_repbuf = (char *)hdr;
+       req->rq_replen = sizeof(*hdr); 
+
+       EXIT;
+       return ptlrpc_reply(obddev, svc, req);
+}
+
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
 {
-        ptl_handle_me_t me_h, bulk_me_h;
         ptl_process_id_t local_id;
         int rc;
         char *repbuf;
@@ -116,8 +287,9 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         local_id.rid = PTL_ID_ANY;
 
         //CERROR("sending req %d\n", request->rq_xid);
-        rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
-                         request->rq_xid, 0, PTL_UNLINK, &me_h);
+        rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id,
+                          request->rq_xid, 0, PTL_UNLINK,
+                          &request->rq_reply_me_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMEAttach failed: %d\n", rc);
                 BUG();
@@ -125,6 +297,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                 goto cleanup;
         }
 
+        request->rq_type = PTL_RPC_REQUEST;
         request->rq_reply_md.start = repbuf;
         request->rq_reply_md.length = request->rq_replen;
         request->rq_reply_md.threshold = 1;
@@ -132,8 +305,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         request->rq_reply_md.user_ptr = request;
         request->rq_reply_md.eventq = rcvd_rep_eq;
 
-        rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
-                         &request->rq_reply_md_h);
+        rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
+                         PTL_UNLINK, &request->rq_reply_md_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMDAttach failed: %d\n", rc);
                 BUG();
@@ -141,42 +314,13 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                 goto cleanup2;
         }
 
-        if (request->rq_bulklen != 0) {
-                rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
-                                 local_id, request->rq_xid, 0, PTL_UNLINK,
-                                 &bulk_me_h);
-                if (rc != PTL_OK) {
-                        CERROR("PtlMEAttach failed: %d\n", rc);
-                        BUG();
-                        EXIT;
-                        goto cleanup3;
-                }
-
-                request->rq_bulk_md.start = request->rq_bulkbuf;
-                request->rq_bulk_md.length = request->rq_bulklen;
-                request->rq_bulk_md.threshold = 1;
-                request->rq_bulk_md.options = PTL_MD_OP_PUT;
-                request->rq_bulk_md.user_ptr = request;
-                request->rq_bulk_md.eventq = bulk_sink_eq;
-
-                rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
-                                 &request->rq_bulk_md_h);
-                if (rc != PTL_OK) {
-                        CERROR("PtlMDAttach failed: %d\n", rc);
-                        BUG();
-                        EXIT;
-                        goto cleanup4;
-                }
-        }
+        CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n",
+               request->rq_replen, request->rq_xid, request->rq_reply_portal);
 
-        return ptl_send_buf(request, peer, request->rq_req_portal, 1);
+        return ptl_send_buf(request, peer, request->rq_req_portal);
 
- cleanup4:
-        PtlMEUnlink(bulk_me_h);
- cleanup3:
-        PtlMDUnlink(request->rq_reply_md_h);
  cleanup2:
-        PtlMEUnlink(me_h);
+        PtlMEUnlink(request->rq_reply_me_h);
  cleanup:
         OBD_FREE(repbuf, request->rq_replen);