Whamcloud - gitweb
b=1897: use the rpcd to send closes, so that we can resend in the case of a
authorshaver <shaver>
Fri, 5 Dec 2003 14:45:23 +0000 (14:45 +0000)
committershaver <shaver>
Fri, 5 Dec 2003 14:45:23 +0000 (14:45 +0000)
        reconnect after user interruption, and avoid leaking an open-count.

        Also, allocate repmsg _before_ reconstructing a close into it.
r=phik

lustre/mdc/mdc_request.c
lustre/mds/mds_open.c

index d69f624..2c9787a 100644 (file)
@@ -399,6 +399,34 @@ static void mdc_commit_close(struct ptlrpc_request *req)
         spin_unlock(&open_req->rq_lock);
 }
 
+static int mdc_close_interpret(struct ptlrpc_request *req, void *data, int rc)
+{
+        union ptlrpc_async_args *aa = data;
+        struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0];
+        mdc_put_rpc_lock(rpc_lock, NULL);
+        wake_up(&req->rq_reply_waitq);
+        RETURN(rc);
+}
+
+/* We can't use ptlrpc_check_reply, because we don't want to wake up for 
+ * anything but a reply or an error. */
+static int mdc_close_check_reply(struct ptlrpc_request *req)
+{
+        int rc = 0;
+        unsigned long flags;
+
+        spin_lock_irqsave(&req->rq_lock, flags);
+        if (req->rq_replied || req->rq_err)
+                rc = 1;
+        spin_unlock_irqrestore (&req->rq_lock, flags);
+        return rc;
+}
+
+static int go_back_to_sleep(void *unused)
+{
+        return 0;
+}
+
 int mdc_close(struct obd_export *exp, struct obdo *obdo,
               struct obd_client_handle *och, struct ptlrpc_request **request)
 {
@@ -410,6 +438,8 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo,
                               obd->u.cli.cl_max_mds_cookiesize};
         struct ptlrpc_request *req;
         struct mdc_open_data *mod;
+        struct l_wait_info lwi;
+        struct mdc_rpc_lock *rpc_lock = obd->u.cli.cl_rpc_lock;
         ENTRY;
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
@@ -441,14 +471,31 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo,
         LASSERT(req->rq_cb_data == NULL);
         req->rq_cb_data = mod;
 
-        mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
-        rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
-
+        /* We hand a ref to the rpcd here, so we need another one of our own. */
+        ptlrpc_request_addref(req);
+
+        mdc_get_rpc_lock(rpc_lock, NULL);
+        req->rq_interpret_reply = mdc_close_interpret;
+        req->rq_async_args.pointer_arg[0] = rpc_lock;
+        ptlrpcd_add_req(req);
+        lwi = LWI_TIMEOUT_INTR(MAX(req->rq_timeout * HZ, 1), go_back_to_sleep,
+                               NULL, NULL);
+        rc = l_wait_event(req->rq_reply_waitq, mdc_close_check_reply(req),
+                          &lwi);
+        
         if (mod == NULL && rc == 0)
                 CERROR("Unexpected: can't find mdc_open_data, but the close "
                        "succeeded.  Please tell CFS.\n");
 
+        if (rc == 0) {
+                rc = req->rq_repmsg->status;
+                if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
+                        DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR");
+                        if (rc > 0)
+                                rc = -rc;
+                }
+        }
+
         EXIT;
  out:
         *request = req;
@@ -741,7 +788,9 @@ static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
         if (!cli->cl_rpc_lock)
                 RETURN(-ENOMEM);
         mdc_init_rpc_lock(cli->cl_rpc_lock);
-        
+
+        ptlrpcd_addref();
+
         OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
         if (!cli->cl_setattr_lock)
                 GOTO(out_free_rpc, rc = -ENOMEM);
@@ -823,6 +872,8 @@ static int mdc_cleanup(struct obd_device *obd, int flags)
         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
 
+        ptlrpcd_decref();
+
         return client_obd_cleanup(obd, flags);
 }
 
index 1d9b134..d57978b 100644 (file)
@@ -1116,7 +1116,13 @@ int mds_close(struct ptlrpc_request *req)
                               obd->u.mds.mds_max_cookiesize};
         ENTRY;
 
-        MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
+        rc = lustre_pack_reply(req, 3, repsize, NULL);
+        if (rc) {
+                CERROR("lustre_pack_reply: rc = %d\n", rc);
+                req->rq_status = rc;
+        } else {
+                MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
+        }
 
         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
         if (body == NULL) {
@@ -1136,12 +1142,6 @@ int mds_close(struct ptlrpc_request *req)
                 RETURN(-ESTALE);
         }
 
-        rc = lustre_pack_reply(req, 3, repsize, NULL);
-        if (rc) {
-                CERROR("lustre_pack_reply: rc = %d\n", rc);
-                req->rq_status = rc;
-        }
-
         inode = mfd->mfd_dentry->d_inode;
         if (mds_inode_is_orphan(inode) && mds_open_orphan_count(inode) == 1) {
                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));