From c311c73251dbc882f11c54cf9bb61f84fc2c5b20 Mon Sep 17 00:00:00 2001 From: shaver Date: Fri, 5 Dec 2003 14:45:23 +0000 Subject: [PATCH] b=1897: use the rpcd to send closes, so that we can resend in the case of a reconnect after user interruption, and avoid leaking an open-count. Also, allocate repmsg _before_ reconstructing a close into it. r=phik --- lustre/mdc/mdc_request.c | 61 ++++++++++++++++++++++++++++++++++++++++++++---- lustre/mds/mds_open.c | 14 +++++------ 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index d69f624..2c9787a 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -399,6 +399,34 @@ static void mdc_commit_close(struct ptlrpc_request *req) spin_unlock(&open_req->rq_lock); } +static int mdc_close_interpret(struct ptlrpc_request *req, void *data, int rc) +{ + union ptlrpc_async_args *aa = data; + struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0]; + mdc_put_rpc_lock(rpc_lock, NULL); + wake_up(&req->rq_reply_waitq); + RETURN(rc); +} + +/* We can't use ptlrpc_check_reply, because we don't want to wake up for + * anything but a reply or an error. */ +static int mdc_close_check_reply(struct ptlrpc_request *req) +{ + int rc = 0; + unsigned long flags; + + spin_lock_irqsave(&req->rq_lock, flags); + if (req->rq_replied || req->rq_err) + rc = 1; + spin_unlock_irqrestore (&req->rq_lock, flags); + return rc; +} + +static int go_back_to_sleep(void *unused) +{ + return 0; +} + int mdc_close(struct obd_export *exp, struct obdo *obdo, struct obd_client_handle *och, struct ptlrpc_request **request) { @@ -410,6 +438,8 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo, obd->u.cli.cl_max_mds_cookiesize}; struct ptlrpc_request *req; struct mdc_open_data *mod; + struct l_wait_info lwi; + struct mdc_rpc_lock *rpc_lock = obd->u.cli.cl_rpc_lock; ENTRY; req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize, @@ -441,14 +471,31 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo, LASSERT(req->rq_cb_data == NULL); req->rq_cb_data = mod; - mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL); - rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(obd->u.cli.cl_rpc_lock, NULL); - + /* We hand a ref to the rpcd here, so we need another one of our own. */ + ptlrpc_request_addref(req); + + mdc_get_rpc_lock(rpc_lock, NULL); + req->rq_interpret_reply = mdc_close_interpret; + req->rq_async_args.pointer_arg[0] = rpc_lock; + ptlrpcd_add_req(req); + lwi = LWI_TIMEOUT_INTR(MAX(req->rq_timeout * HZ, 1), go_back_to_sleep, + NULL, NULL); + rc = l_wait_event(req->rq_reply_waitq, mdc_close_check_reply(req), + &lwi); + if (mod == NULL && rc == 0) CERROR("Unexpected: can't find mdc_open_data, but the close " "succeeded. Please tell CFS.\n"); + if (rc == 0) { + rc = req->rq_repmsg->status; + if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) { + DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR"); + if (rc > 0) + rc = -rc; + } + } + EXIT; out: *request = req; @@ -741,7 +788,9 @@ static int mdc_setup(struct obd_device *obd, obd_count len, void *buf) if (!cli->cl_rpc_lock) RETURN(-ENOMEM); mdc_init_rpc_lock(cli->cl_rpc_lock); - + + ptlrpcd_addref(); + OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock)); if (!cli->cl_setattr_lock) GOTO(out_free_rpc, rc = -ENOMEM); @@ -823,6 +872,8 @@ static int mdc_cleanup(struct obd_device *obd, int flags) OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock)); + ptlrpcd_decref(); + return client_obd_cleanup(obd, flags); } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 1d9b134..d57978b 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -1116,7 +1116,13 @@ int mds_close(struct ptlrpc_request *req) obd->u.mds.mds_max_cookiesize}; ENTRY; - MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); + rc = lustre_pack_reply(req, 3, repsize, NULL); + if (rc) { + CERROR("lustre_pack_reply: rc = %d\n", rc); + req->rq_status = rc; + } else { + MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); + } body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body); if (body == NULL) { @@ -1136,12 +1142,6 @@ int mds_close(struct ptlrpc_request *req) RETURN(-ESTALE); } - rc = lustre_pack_reply(req, 3, repsize, NULL); - if (rc) { - CERROR("lustre_pack_reply: rc = %d\n", rc); - req->rq_status = rc; - } - inode = mfd->mfd_dentry->d_inode; if (mds_inode_is_orphan(inode) && mds_open_orphan_count(inode) == 1) { body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body)); -- 1.8.3.1