Whamcloud - gitweb
- added rq_type field to ptlrpc_request
authorpschwan <pschwan>
Thu, 28 Feb 2002 03:05:25 +0000 (03:05 +0000)
committerpschwan <pschwan>
Thu, 28 Feb 2002 03:05:25 +0000 (03:05 +0000)
- this field must be one of PTLRPC_REQUEST, PTLRPC_REPLY, PTLRPC_BULK
- mdc_request now waits for directory pages to be sent via the bulk portal
- change MDS bulk waitq to the new style
- ptl_send_buf no longer takes the 'is_request' argument; this is encoded in
  the rq_type mentioned above
- added ptl_abort_rpc() to tear down reply structures for aborted requests.
- refactored ptl_send_buf code to rid ourselves of some of the more gross
  bulklen hacks

lustre/include/linux/lustre_net.h
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/rpc.c
lustre/tests/llmount.sh

index 2aa6862..0efec74 100644 (file)
@@ -74,18 +74,18 @@ struct ptlrpc_service {
                         int *replen, char **repbuf); 
 };
 
-
 struct ptlrpc_request { 
+        int rq_type; /* one of PTLRPC_REQUEST, PTLRPC_REPLY, PTLRPC_BULK */
        struct list_head rq_list;
        struct mds_obd *rq_obd;
        struct ost_obd *rq_ost;
        int rq_status;
+        __u32 rq_xid;
 
        char *rq_reqbuf;
        int rq_reqlen;
        struct ptlreq_hdr *rq_reqhdr;
        union ptl_req rq_req;
-        __u32 rq_xid;
 
        char *rq_repbuf;
        int rq_replen;
@@ -96,15 +96,18 @@ struct ptlrpc_request {
         int rq_bulklen;
         int (*rq_bulk_cb)(struct ptlrpc_request *, void *);
 
-        void * rq_reply_handle;
+        void *rq_reply_handle;
        wait_queue_head_t rq_wait_for_rep;
        wait_queue_head_t rq_wait_for_bulk;
 
         ptl_md_t rq_reply_md;
         ptl_handle_md_t rq_reply_md_h;
+        ptl_handle_me_t rq_reply_me_h;
+
         ptl_md_t rq_req_md;
         ptl_md_t rq_bulk_md;
         ptl_handle_md_t rq_bulk_md_h;
+        ptl_handle_me_t rq_bulk_me_h;
         __u32 rq_reply_portal;
         __u32 rq_req_portal;
         __u32 rq_bulk_portal;
@@ -125,11 +128,13 @@ struct ptlrpc_client {
         int (*cli_enqueue)(struct ptlrpc_request *req);
 };
 
-
-
 /* rpc/rpc.c */
+#define PTLRPC_REQUEST 1
+#define PTLRPC_REPLY   2
+#define PTLRPC_BULK    3
+
 int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
-                 int portal, int is_request);
+                 int portal);
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer);
 int ptl_received_rpc(struct ptlrpc_service *service);
 int rpc_register_service(struct ptlrpc_service *service, char *uuid);
index 8d322ad..b027372 100644 (file)
@@ -176,7 +176,7 @@ int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset,
         CDEBUG(D_INODE, "inode: %ld\n", ino);
 
        request = ptlrpc_prep_req(peer, MDS_READPAGE, 0, NULL,
-                              sizeof(struct niobuf), (char *)&niobuf);
+                                  sizeof(struct niobuf), (char *)&niobuf);
        if (!request) { 
                CERROR("mdc request: cannot pack\n");
                return -ENOMEM;
@@ -187,8 +187,8 @@ int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset,
        request->rq_req.mds->size = offset;
        request->rq_req.mds->tgtlen = sizeof(niobuf); 
 
-        //request->rq_bulklen = PAGE_SIZE;
-        //request->rq_bulkbuf = (void *)(long)niobuf.addr;
+        request->rq_bulklen = PAGE_SIZE;
+        request->rq_bulkbuf = (void *)(long)niobuf.addr;
        request->rq_bulk_portal = MDS_BULK_PORTAL;
        request->rq_replen = 
                sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep);
index 3b0f965..d2b611e 100644 (file)
@@ -92,6 +92,7 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                        return -EIO;
        } else {
                char *buf;
+                DECLARE_WAITQUEUE(wait, current);
 
                OBD_ALLOC(buf, PAGE_SIZE);
                if (!buf)
@@ -106,11 +107,33 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                        return -EIO;
                 }
 
+                req->rq_type = PTLRPC_BULK;
                req->rq_bulkbuf = buf;
                req->rq_bulklen = PAGE_SIZE;
+
                init_waitqueue_head(&req->rq_wait_for_bulk);
-               rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
-               sleep_on(&req->rq_wait_for_bulk);
+               rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL);
+                add_wait_queue(&req->rq_wait_for_bulk, &wait);
+                /* The bulk callback will set rq->bulkbuf to NULL when it's
+                 * been ACKed and it's finished using it. */
+                while (req->rq_bulkbuf != NULL) {
+                        set_current_state(TASK_INTERRUPTIBLE);
+
+                        /* if this process really wants to die, let it go */
+                        if (sigismember(&(current->pending.signal), SIGKILL) ||
+                            sigismember(&(current->pending.signal), SIGINT))
+                                break;
+
+                        schedule();
+                }
+                remove_wait_queue(&req->rq_wait_for_bulk, &wait);
+                set_current_state(TASK_RUNNING);
+
+                if (req->rq_bulkbuf != NULL) {
+                        EXIT;
+                        return -EINTR;
+                }
+
                 OBD_FREE(buf, PAGE_SIZE);
                req->rq_bulklen = 0; /* FIXME: eek. */
        }
@@ -128,7 +151,8 @@ int mds_reply(struct ptlrpc_request *req)
                /* This is a request that came from the network via portals. */
 
                /* FIXME: we need to increment the count of handled events */
-               ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
+                req->rq_type = PTLRPC_REPLY;
+               ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL);
        } else {
                /* This is a local request that came from another thread. */
 
index bbb2f96..1b9dd03 100644 (file)
@@ -422,9 +422,10 @@ int osc_sendpage(struct ptlrpc_request *req, struct niobuf *dst,
 
                 memcpy(buf, (char *)(unsigned long)src->addr, src->len);
 
+                req->rq_type = PTLRPC_BULK;
                 req->rq_bulkbuf = buf;
                 req->rq_bulklen = src->len;
-                rc = ptl_send_buf(req, &req->rq_peer, OST_BULK_PORTAL, 0);
+                rc = ptl_send_buf(req, &req->rq_peer, OST_BULK_PORTAL);
                 init_waitqueue_head(&req->rq_wait_for_bulk);
                 sleep_on(&req->rq_wait_for_bulk);
                 OBD_FREE(buf, src->len);
index 3ba12c7..e45c6f9 100644 (file)
@@ -98,7 +98,8 @@ int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
                /* This is a request that came from the network via portals. */
 
                /* FIXME: we need to increment the count of handled events */
-               ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
+                req->rq_type = PTLRPC_REPLY;
+               ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL);
        } else {
                /* This is a local request that came from another thread. */
 
index 6a7b7c9..d7f46a1 100644 (file)
@@ -71,8 +71,23 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
        OBD_FREE(request, sizeof(*request));
 }
 
-int ptlrpc_queue_wait(struct ptlrpc_request *req, 
-                             struct ptlrpc_client *cl)
+/* Abort this request and cleanup any resources associated with it. */
+int ptl_abort_rpc(struct ptlrpc_request *request)
+{
+        /* First remove the MD for the reply; in theory, this means
+         * that we can tear down the buffer safely. */
+        PtlMEUnlink(request->rq_reply_me_h);
+        PtlMDUnlink(request->rq_reply_md_h);
+
+        if (request->rq_bulklen != 0) {
+                PtlMEUnlink(request->rq_bulk_me_h);
+                PtlMDUnlink(request->rq_bulk_md_h);
+        }
+
+        return 0;
+}
+
+int ptlrpc_queue_wait(struct ptlrpc_request *req, struct ptlrpc_client *cl)
 {
        int rc;
         DECLARE_WAITQUEUE(wait, current);
@@ -112,12 +127,15 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req,
         CDEBUG(0, "-- done\n");
 
         if (req->rq_repbuf == NULL) {
-                /* We broke out because of a signal */
+                /* We broke out because of a signal.  Clean up the dangling
+                 * reply buffers! */
+                ptl_abort_rpc(req);
                 EXIT;
                 return -EINTR;
         }
 
-       rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
+       rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr,
+                                &req->rq_rep);
        if (rc) {
                CERROR("unpack_rep failed: %d\n", rc);
                return rc;
@@ -234,6 +252,7 @@ static int bulk_source_callback(ptl_event_t *ev, void *data)
                 CDEBUG(D_NET, "got SENT event\n");
         } else if (ev->type == PTL_EVENT_ACK) {
                 CDEBUG(D_NET, "got ACK event\n");
+                rpc->rq_bulkbuf = NULL;
                 wake_up_interruptible(&rpc->rq_wait_for_bulk);
         } else {
                 CERROR("Unexpected event type!\n");
@@ -253,7 +272,7 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data)
         if (ev->type == PTL_EVENT_PUT) {
                 if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
                         CERROR("bulkbuf != mem_desc -- why?\n");
-                wake_up_interruptible(&rpc->rq_wait_for_bulk);
+                //wake_up_interruptible(&rpc->rq_wait_for_bulk);
         } else {
                 CERROR("Unexpected event type!\n");
                 BUG();
@@ -264,27 +283,38 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data)
 }
 
 int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
-                 int portal, int is_request)
+                 int portal)
 {
         int rc;
         ptl_process_id_t remote_id;
         ptl_handle_md_t md_h;
+        ptl_ack_req_t ack;
 
-        /* FIXME: This is bad. */
-        if (request->rq_bulklen) {
+        switch (request->rq_type) {
+        case PTLRPC_BULK:
                 request->rq_req_md.start = request->rq_bulkbuf;
                 request->rq_req_md.length = request->rq_bulklen;
                 request->rq_req_md.eventq = bulk_source_eq;
-        } else if (is_request) {
+                request->rq_req_md.threshold = 2; /* SENT and ACK events */
+                ack = PTL_ACK_REQ;
+                break;
+        case PTLRPC_REQUEST:
                 request->rq_req_md.start = request->rq_reqbuf;
                 request->rq_req_md.length = request->rq_reqlen;
                 request->rq_req_md.eventq = sent_pkt_eq;
-        } else {
+                request->rq_req_md.threshold = 1;
+                ack = PTL_NOACK_REQ;
+                break;
+        case PTLRPC_REPLY:
                 request->rq_req_md.start = request->rq_repbuf;
                 request->rq_req_md.length = request->rq_replen;
                 request->rq_req_md.eventq = sent_pkt_eq;
+                request->rq_req_md.threshold = 1;
+                ack = PTL_NOACK_REQ;
+                break;
+        default:
+                BUG();
         }
-        request->rq_req_md.threshold = 1;
         request->rq_req_md.options = PTL_MD_OP_PUT;
         request->rq_req_md.user_ptr = request;
 
@@ -299,13 +329,10 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
         remote_id.nid = peer->peer_nid;
         remote_id.pid = 0;
 
-        if (request->rq_bulklen) {
-                rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
-                            request->rq_xid, 0, 0);
-        } else {
-                rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
-                            request->rq_xid, 0, 0);
-        }
+        CERROR("Sending %d bytes to portal %d, xid %d\n",
+               request->rq_req_md.length, portal, request->rq_xid);
+
+        rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0);
         if (rc != PTL_OK) {
                 BUG();
                 CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
@@ -318,7 +345,6 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
 
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
 {
-        ptl_handle_me_t me_h, bulk_me_h;
         ptl_process_id_t local_id;
         int rc;
         char *repbuf;
@@ -345,7 +371,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
 
         CERROR("sending req %d\n", request->rq_xid);
         rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
-                         request->rq_xid, 0, PTL_UNLINK, &me_h);
+                         request->rq_xid, 0, PTL_UNLINK,
+                         &request->rq_reply_me_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMEAttach failed: %d\n", rc);
                 BUG();
@@ -353,6 +380,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                 goto cleanup;
         }
 
+        request->rq_type = PTLRPC_REQUEST;
         request->rq_reply_md.start = repbuf;
         request->rq_reply_md.length = request->rq_replen;
         request->rq_reply_md.threshold = 1;
@@ -360,8 +388,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         request->rq_reply_md.user_ptr = request;
         request->rq_reply_md.eventq = rcvd_rep_eq;
 
-        rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
-                         &request->rq_reply_md_h);
+        rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
+                         PTL_UNLINK, &request->rq_reply_md_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMDAttach failed: %d\n", rc);
                 BUG();
@@ -372,7 +400,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         if (request->rq_bulklen != 0) {
                 rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
                                  local_id, request->rq_xid, 0, PTL_UNLINK,
-                                 &bulk_me_h);
+                                 &request->rq_bulk_me_h);
                 if (rc != PTL_OK) {
                         CERROR("PtlMEAttach failed: %d\n", rc);
                         BUG();
@@ -387,7 +415,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                 request->rq_bulk_md.user_ptr = request;
                 request->rq_bulk_md.eventq = bulk_sink_eq;
 
-                rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
+                rc = PtlMDAttach(request->rq_bulk_me_h,
+                                 request->rq_bulk_md, PTL_UNLINK,
                                  &request->rq_bulk_md_h);
                 if (rc != PTL_OK) {
                         CERROR("PtlMDAttach failed: %d\n", rc);
@@ -397,14 +426,14 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                 }
         }
 
-        return ptl_send_buf(request, peer, request->rq_req_portal, 1);
+        return ptl_send_buf(request, peer, request->rq_req_portal);
 
  cleanup4:
-        PtlMEUnlink(bulk_me_h);
+        PtlMEUnlink(request->rq_bulk_me_h);
  cleanup3:
         PtlMDUnlink(request->rq_reply_md_h);
  cleanup2:
-        PtlMEUnlink(me_h);
+        PtlMEUnlink(request->rq_reply_me_h);
  cleanup:
         OBD_FREE(repbuf, request->rq_replen);
 
index bf0e991..bc888bb 100755 (executable)
@@ -62,4 +62,4 @@ quit
 EOF
 
 mkdir /mnt/obd
-mount -t lustre_light -o device=3 none /mnt/obd
+mount -t lustre_light -o device=3 none /mnt/obd