Whamcloud - gitweb
fix rawrpc, don't drop buffer before rpc really finished on wire, portals
authorericm <ericm>
Tue, 7 Jun 2005 00:53:04 +0000 (00:53 +0000)
committerericm <ericm>
Tue, 7 Jun 2005 00:53:04 +0000 (00:53 +0000)
callback might arrive very late.

lustre/include/linux/lustre_net.h
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/ptlrpc_module.c
lustre/sec/gss/sec_gss.c

index f791734..7e1aa52 100644 (file)
@@ -559,6 +559,7 @@ extern struct ptlrpc_ni ptlrpc_interfaces[];
 extern int              ptlrpc_ninterfaces;
 extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, struct ptlrpc_peer *peer);
 extern void request_out_callback (ptl_event_t *ev);
+extern void rawrpc_request_out_callback(ptl_event_t *ev);
 extern void reply_in_callback(ptl_event_t *ev);
 extern void client_bulk_callback (ptl_event_t *ev);
 extern void request_in_callback(ptl_event_t *ev);
@@ -600,8 +601,15 @@ int ptlrpc_error(struct ptlrpc_request *req);
 void ptlrpc_resend_req(struct ptlrpc_request *request);
 int ptl_send_rpc(struct ptlrpc_request *request);
 int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
-int ptlrpc_do_rawrpc(struct obd_import *imp, char *reqbuf, int reqlen,
-                     char *repbuf, int *replenp, int timeout);
+
+struct ptlrpc_request * ptl_do_rawrpc(struct obd_import *imp,
+                                      char *reqbuf, int reqbuf_len, int reqlen,
+                                      char *repbuf, int repbuf_len,
+                                      int *replenp, int timeout, int *res);
+int ptl_do_rawrpc_simple(struct obd_import *imp,
+                         char *reqbuf, int reqlen,
+                         char *repbuf, int *replenp);
+void rawrpc_req_finished(struct ptlrpc_request *req);
 
 /* ptlrpc/client.c */
 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
index 20fde70..c39a7a2 100644 (file)
@@ -332,6 +332,18 @@ void server_bulk_callback (ptl_event_t *ev)
         EXIT;
 }
 
+void rawrpc_request_out_callback(ptl_event_t *ev)
+{
+        struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
+        struct ptlrpc_request *req = cbid->cbid_arg;
+
+        /* we suppose request_out_callback() will drop the
+         * reference by 1
+         */
+        request_out_callback(ev);
+        rawrpc_req_finished(req);
+}
+
 static void ptlrpc_master_callback(ptl_event_t *ev)
 {
         struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
@@ -344,7 +356,8 @@ static void ptlrpc_master_callback(ptl_event_t *ev)
                  callback == client_bulk_callback ||
                  callback == request_in_callback ||
                  callback == reply_out_callback ||
-                 callback == server_bulk_callback);
+                 callback == server_bulk_callback ||
+                 callback == rawrpc_request_out_callback);
         
         callback (ev);
 }
index 7cdb908..f9b4ae0 100644 (file)
@@ -557,6 +557,11 @@ int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
         return (-ENOMEM);
 }
 
+
+/********************************************
+ * rawrpc stuff, currently only used by gss *
+ ********************************************/
+
 static int rawrpc_timedout(void *data)
 {
         struct ptlrpc_request *req = (struct ptlrpc_request *) data;
@@ -570,6 +575,11 @@ static int rawrpc_timedout(void *data)
         return 1;
 }
 
+static int rawrpc_timedout_wait(void *data)
+{
+        return 0;
+}
+
 /* to make things as simple as possible */
 static int rawrpc_check_reply(struct ptlrpc_request *req)
 {
@@ -583,17 +593,50 @@ static int rawrpc_check_reply(struct ptlrpc_request *req)
         return rc;
 }
 
+void rawrpc_req_finished(struct ptlrpc_request *req)
+{
+        struct obd_import *imp;
+        unsigned long irq_flags;
+
+        if (!req)
+                return;
+
+        if (!atomic_dec_and_test(&req->rq_refcount))
+                return;
+
+        LASSERT(req->rq_import);
+        imp = req->rq_import;
+
+        spin_lock_irqsave(&imp->imp_lock, irq_flags);
+        list_del_init(&req->rq_list);
+        spin_unlock_irqrestore(&imp->imp_lock, irq_flags);
+
+        class_import_put(imp);
+
+        if (req->rq_reqbuf) {
+                LASSERT(req->rq_reqbuf_len);
+                OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
+        }
+        if (req->rq_repbuf) {
+                LASSERT(req->rq_repbuf_len);
+                OBD_FREE(req->rq_repbuf, req->rq_repbuf_len);
+        }
+        OBD_FREE(req, sizeof(*req));
+}
+
 /*
- * Construct a fake ptlrpc_request to do the work, in order to
- * user the existing callback/wakeup facilities
+ * if returned non-NULL, reqbuf and repbuf will be take over by the
+ * request: the caller can't release them directly, instead should
+ * call rawrpc_req_finished().
  */
-int ptlrpc_do_rawrpc(struct obd_import *imp,
-                     char *reqbuf, int reqlen,
-                     char *repbuf, int *replenp,
-                     int timeout)
+struct ptlrpc_request *
+ptl_do_rawrpc(struct obd_import *imp,
+              char *reqbuf, int reqbuf_len, int reqlen,
+              char *repbuf, int repbuf_len, int *replenp,
+              int timeout, int *res)
 {
         struct ptlrpc_connection *conn;
-        struct ptlrpc_request request; /* just a fake one */
+        struct ptlrpc_request *request;
         ptl_handle_me_t reply_me_h;
         ptl_md_t reply_md, req_md;
         struct l_wait_info lwi;
@@ -602,120 +645,157 @@ int ptlrpc_do_rawrpc(struct obd_import *imp,
         ENTRY;
 
         LASSERT(imp);
-        class_import_get(imp);
-        if (imp->imp_state == LUSTRE_IMP_CLOSED) {
-                CDEBUG(D_SEC, "raw rpc on closed imp(=>%s)? send anyway\n",
-                       imp->imp_target_uuid.uuid);
+        LASSERT(reqbuf && reqbuf_len);
+        LASSERT(repbuf && repbuf_len);
+        LASSERT(reqlen && reqlen <= reqbuf_len);
+
+        OBD_ALLOC(request, sizeof(*request));
+        if (!request) {
+                *res = -ENOMEM;
+                RETURN(NULL);
         }
 
+        imp = request->rq_import = class_import_get(imp);
         conn = imp->imp_connection;
 
+        if (imp->imp_state == LUSTRE_IMP_CLOSED) {
+                CDEBUG(D_SEC, "raw rpc on closed imp(=>%s)\n",
+                       imp->imp_target_uuid.uuid);
+        }
+
         /* initialize request */
-        memset(&request, 0, sizeof(request));
-        request.rq_req_cbid.cbid_fn = request_out_callback;
-        request.rq_req_cbid.cbid_arg = &request;
-        request.rq_reply_cbid.cbid_fn  = reply_in_callback;
-        request.rq_reply_cbid.cbid_arg = &request;
-        request.rq_reqbuf = reqbuf;
-        request.rq_reqbuf_len = reqlen;
-        request.rq_repbuf = repbuf;
-        request.rq_repbuf_len = *replenp;
-        request.rq_set = NULL;
-        spin_lock_init(&request.rq_lock);
-        init_waitqueue_head(&request.rq_reply_waitq);
-        atomic_set(&request.rq_refcount, 1000000); /* never be droped */
-        request.rq_xid = ptlrpc_next_xid();
+        request->rq_req_cbid.cbid_fn = rawrpc_request_out_callback;
+        request->rq_req_cbid.cbid_arg = request;
+        request->rq_reply_cbid.cbid_fn  = reply_in_callback;
+        request->rq_reply_cbid.cbid_arg = request;
+        request->rq_reqbuf = reqbuf;
+        request->rq_reqbuf_len = reqbuf_len;
+        request->rq_repbuf = repbuf;
+        request->rq_repbuf_len = repbuf_len;
+        request->rq_set = NULL;
+        spin_lock_init(&request->rq_lock);
+        init_waitqueue_head(&request->rq_reply_waitq);
+        atomic_set(&request->rq_refcount, 1);
+        request->rq_xid = ptlrpc_next_xid();
 
         /* add into sending list */
         spin_lock_irqsave(&imp->imp_lock, irq_flags);
-        list_add_tail(&request.rq_list, &imp->imp_rawrpc_list);
+        list_add_tail(&request->rq_list, &imp->imp_rawrpc_list);
         spin_unlock_irqrestore(&imp->imp_lock, irq_flags);
 
         /* prepare reply buffer */
         rc = PtlMEAttach(conn->c_peer.peer_ni->pni_ni_h,
                          imp->imp_client->cli_reply_portal,
-                         conn->c_peer.peer_id, request.rq_xid, 0, PTL_UNLINK,
+                         conn->c_peer.peer_id, request->rq_xid, 0, PTL_UNLINK,
                          PTL_INS_AFTER, &reply_me_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMEAttach failed: %d\n", rc);
                 LASSERT (rc == PTL_NO_SPACE);
-                GOTO(cleanup_imp, rc = -ENOMEM);
+                GOTO(out, rc = -ENOMEM);
         }
 
-        spin_lock_irqsave(&request.rq_lock, irq_flags);
-        request.rq_receiving_reply = 1;
-        spin_unlock_irqrestore(&request.rq_lock, irq_flags);
+        spin_lock_irqsave(&request->rq_lock, irq_flags);
+        request->rq_receiving_reply = 1;
+        spin_unlock_irqrestore(&request->rq_lock, irq_flags);
 
         reply_md.start          = repbuf;
-        reply_md.length         = *replenp;
+        reply_md.length         = repbuf_len;
         reply_md.threshold      = 1;
         reply_md.options        = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT;
-        reply_md.user_ptr       = &request.rq_reply_cbid;
+        reply_md.user_ptr       = &request->rq_reply_cbid;
         reply_md.eq_handle      = conn->c_peer.peer_ni->pni_eq_h;
 
         rc = PtlMDAttach(reply_me_h, reply_md, PTL_UNLINK,
-                         &request.rq_reply_md_h);
+                          &request->rq_reply_md_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMDAttach failed: %d\n", rc);
                 LASSERT (rc == PTL_NO_SPACE);
                 GOTO(cleanup_me, rc = -ENOMEM);
         }
 
+        /*
+         * the extra 2 refcount will be balanced by out_callback
+         */
+        atomic_set(&request->rq_refcount, 3);
+
         /* prepare request buffer */
         req_md.start            = reqbuf;
         req_md.length           = reqlen;
         req_md.threshold        = 1;
         req_md.options          = PTLRPC_MD_OPTIONS;
-        req_md.user_ptr         = &request.rq_req_cbid;
+        req_md.user_ptr         = &request->rq_req_cbid;
         req_md.eq_handle        = conn->c_peer.peer_ni->pni_eq_h;
 
         rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h,
-                       req_md, PTL_UNLINK, &request.rq_req_md_h);
+                       req_md, PTL_UNLINK, &request->rq_req_md_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMDBind failed %d\n", rc);
                 LASSERT (rc == PTL_NO_SPACE);
+                atomic_set(&request->rq_refcount, 1);
                 GOTO(cleanup_me, rc = -ENOMEM);
         }
 
-        rc = PtlPut(request.rq_req_md_h, PTL_NOACK_REQ, conn->c_peer.peer_id,
-                    imp->imp_client->cli_request_portal,
-                    0, request.rq_xid, 0, 0);
+        rc = PtlPut(request->rq_req_md_h, PTL_NOACK_REQ, conn->c_peer.peer_id,
+                    imp->imp_client->cli_request_portal, 0,
+                    request->rq_xid, 0, 0);
         if (rc != PTL_OK) {
                 CERROR("PtlPut failed %d\n", rc);
                 GOTO(cleanup_md, rc);
         }
 
-        lwi = LWI_TIMEOUT(timeout * HZ, rawrpc_timedout, &request);
-        l_wait_event(request.rq_reply_waitq,
-                     rawrpc_check_reply(&request), &lwi);
-
-        ptlrpc_unregister_reply(&request);
-
-        if (request.rq_err || request.rq_resend || request.rq_intr ||
-            request.rq_timedout || !request.rq_replied) {
-                CERROR("secinit rpc error: err %d, resend %d, "
-                       "intr %d, timedout %d, replied %d\n",
-                        request.rq_err, request.rq_resend, request.rq_intr,
-                        request.rq_timedout, request.rq_replied);
-                if (request.rq_timedout)
-                        rc = -ETIMEDOUT;
-                else
-                        rc = -EINVAL;
-        } else {
-                *replenp = request.rq_nob_received;
+        if (timeout)
+                lwi = LWI_TIMEOUT(timeout * HZ, rawrpc_timedout, request);
+        else
+                lwi = LWI_TIMEOUT(100 * HZ, rawrpc_timedout_wait, request);
+
+        l_wait_event(request->rq_reply_waitq,
+                     rawrpc_check_reply(request), &lwi);
+
+        ptlrpc_unregister_reply(request);
+
+        if (request->rq_replied) {
+                *replenp = request->rq_nob_received;
                 rc = 0;
+        } else {
+                CERROR("rawrpc error: err %d, neterr %d, int %d, timedout %d\n",
+                        request->rq_err, request->rq_net_err,
+                        request->rq_intr, request->rq_timedout);
+
+                /* give timedout higher priority */
+                rc = request->rq_timedout ? -ETIMEDOUT : -EIO;
         }
-        GOTO(cleanup_imp, rc);
+
+out:
+        *res = rc;
+        RETURN(request);
 
 cleanup_md:
-        PtlMDUnlink(request.rq_req_md_h);
+        PtlMDUnlink(request->rq_req_md_h);
 cleanup_me:
         PtlMEUnlink(reply_me_h);
-cleanup_imp:
-        spin_lock_irqsave(&imp->imp_lock, irq_flags);
-        list_del_init(&request.rq_list);
-        spin_unlock_irqrestore(&imp->imp_lock, irq_flags);
+        goto out;
+}
 
-        class_import_put(imp);
-        RETURN(rc);
+/*
+ * caller will take care the reqbuf & repbuf, and only return
+ * when the rpc really finished on wire.
+ */
+int ptl_do_rawrpc_simple(struct obd_import *imp,
+                         char *reqbuf, int reqlen,
+                         char *repbuf, int *replenp)
+{
+        struct ptlrpc_request *req;
+        int res;
+
+        req = ptl_do_rawrpc(imp, reqbuf, reqlen, reqlen,
+                            repbuf, *replenp, replenp, 0, &res);
+
+        if (req == NULL)
+                return res;
+
+        req->rq_reqbuf = req->rq_repbuf = NULL;
+        req->rq_reqbuf_len = req->rq_repbuf_len = 0;
+
+        rawrpc_req_finished(req);
+        return res;
 }
index 786bdce..c92f34e 100644 (file)
@@ -92,7 +92,9 @@ EXPORT_SYMBOL(ptlrpc_reply);
 EXPORT_SYMBOL(ptlrpc_error);
 EXPORT_SYMBOL(ptlrpc_resend_req);
 EXPORT_SYMBOL(ptl_send_rpc);
-EXPORT_SYMBOL(ptlrpc_do_rawrpc);
+EXPORT_SYMBOL(ptl_do_rawrpc);
+EXPORT_SYMBOL(ptl_do_rawrpc_simple);
+EXPORT_SYMBOL(rawrpc_req_finished);
 
 /* client.c */
 EXPORT_SYMBOL(ptlrpc_init_client);
index d3cf0d1..c6f7eb7 100644 (file)
@@ -258,6 +258,7 @@ struct lgssd_ioctl_param {
 static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
 {
         struct obd_import        *imp;
+        struct ptlrpc_request    *request = NULL;
         struct lgssd_ioctl_param  param;
         const int                 reqbuf_size = 1024;
         const int                 repbuf_size = 1024;
@@ -319,10 +320,10 @@ static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
                 goto out_copy;
         }
 
-        replen = repbuf_size;
-        rc = ptlrpc_do_rawrpc(imp, reqbuf, reqlen,
-                              repbuf, &replen, SECINIT_RPC_TIMEOUT);
-        if (rc) {
+        request = ptl_do_rawrpc(imp, reqbuf, reqbuf_size, reqlen,
+                                repbuf, repbuf_size, &replen,
+                                SECINIT_RPC_TIMEOUT, &rc);
+        if (request == NULL || rc) {
                 param.status = rc;
                 goto out_copy;
         }
@@ -351,32 +352,15 @@ out_copy:
                 rc = 0;
 
         class_import_put(imp);
-        if (repbuf)
-                OBD_FREE(repbuf, repbuf_size);
-        if (reqbuf)
-                OBD_FREE(reqbuf, reqbuf_size);
-        RETURN(rc);
-}
-
-static int gss_send_secfini_rpc(struct obd_import *imp,
-                                char *reqbuf, int reqlen)
-{
-        const int repbuf_size = 1024;
-        char *repbuf;
-        int replen = repbuf_size;
-        int rc;
-
-        OBD_ALLOC(repbuf, repbuf_size);
-        if (!repbuf) {
-                CERROR("Out of memory\n");
-                return -ENOMEM;
+        if (request == NULL) {
+                if (repbuf)
+                        OBD_FREE(repbuf, repbuf_size);
+                if (reqbuf)
+                        OBD_FREE(reqbuf, reqbuf_size);
+        } else {
+                rawrpc_req_finished(request);
         }
-
-        rc = ptlrpc_do_rawrpc(imp, reqbuf, reqlen, repbuf, &replen,
-                              SECFINI_RPC_TIMEOUT);
-
-        OBD_FREE(repbuf, repbuf_size);
-        return rc;
+        RETURN(rc);
 }
 
 /**********************************************
@@ -1313,6 +1297,10 @@ static void destroy_gss_context(struct ptlrpc_cred *cred)
         struct ptlrpc_request    req;
         struct obd_import       *imp;
         __u32                   *vp, lmsg_size;
+        struct ptlrpc_request   *raw_req = NULL;
+        const int                repbuf_len = 256;
+        char                    *repbuf;
+        int                      replen, rc;
         ENTRY;
 
         /* cred's refcount is 0, steal one */
@@ -1379,10 +1367,21 @@ static void destroy_gss_context(struct ptlrpc_cred *cred)
         }
         atomic_dec(&cred->pc_refcount);
 
-        /* send out */
-        gss_send_secfini_rpc(imp, req.rq_reqbuf, req.rq_reqdata_len);
+        OBD_ALLOC(repbuf, repbuf_len);
+        if (!repbuf)
+                goto exit;
+
+        raw_req = ptl_do_rawrpc(imp, req.rq_reqbuf, req.rq_reqbuf_len,
+                                req.rq_reqdata_len, repbuf, repbuf_len, &replen,
+                                SECFINI_RPC_TIMEOUT, &rc);
+        if (!raw_req)
+                OBD_FREE(repbuf, repbuf_len);
+
 exit:
-        OBD_FREE(req.rq_reqbuf, req.rq_reqbuf_len);
+        if (raw_req == NULL)
+                OBD_FREE(req.rq_reqbuf, req.rq_reqbuf_len);
+        else
+                rawrpc_req_finished(raw_req);
         EXIT;
 }