From 051133e88fb43ce773be65833a70a2d24dc36bda Mon Sep 17 00:00:00 2001 From: ericm Date: Tue, 7 Jun 2005 00:53:04 +0000 Subject: [PATCH] fix rawrpc, don't drop buffer before rpc really finished on wire, portals callback might arrive very late. --- lustre/include/linux/lustre_net.h | 12 ++- lustre/ptlrpc/events.c | 15 ++- lustre/ptlrpc/niobuf.c | 210 ++++++++++++++++++++++++++------------ lustre/ptlrpc/ptlrpc_module.c | 4 +- lustre/sec/gss/sec_gss.c | 63 ++++++------ 5 files changed, 203 insertions(+), 101 deletions(-) diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index f791734..7e1aa52 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -559,6 +559,7 @@ extern struct ptlrpc_ni ptlrpc_interfaces[]; extern int ptlrpc_ninterfaces; extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, struct ptlrpc_peer *peer); extern void request_out_callback (ptl_event_t *ev); +extern void rawrpc_request_out_callback(ptl_event_t *ev); extern void reply_in_callback(ptl_event_t *ev); extern void client_bulk_callback (ptl_event_t *ev); extern void request_in_callback(ptl_event_t *ev); @@ -600,8 +601,15 @@ int ptlrpc_error(struct ptlrpc_request *req); void ptlrpc_resend_req(struct ptlrpc_request *request); int ptl_send_rpc(struct ptlrpc_request *request); int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd); -int ptlrpc_do_rawrpc(struct obd_import *imp, char *reqbuf, int reqlen, - char *repbuf, int *replenp, int timeout); + +struct ptlrpc_request * ptl_do_rawrpc(struct obd_import *imp, + char *reqbuf, int reqbuf_len, int reqlen, + char *repbuf, int repbuf_len, + int *replenp, int timeout, int *res); +int ptl_do_rawrpc_simple(struct obd_import *imp, + char *reqbuf, int reqlen, + char *repbuf, int *replenp); +void rawrpc_req_finished(struct ptlrpc_request *req); /* ptlrpc/client.c */ void ptlrpc_init_client(int req_portal, int rep_portal, char *name, diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 20fde70..c39a7a2 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -332,6 +332,18 @@ void server_bulk_callback (ptl_event_t *ev) EXIT; } +void rawrpc_request_out_callback(ptl_event_t *ev) +{ + struct ptlrpc_cb_id *cbid = ev->md.user_ptr; + struct ptlrpc_request *req = cbid->cbid_arg; + + /* we suppose request_out_callback() will drop the + * reference by 1 + */ + request_out_callback(ev); + rawrpc_req_finished(req); +} + static void ptlrpc_master_callback(ptl_event_t *ev) { struct ptlrpc_cb_id *cbid = ev->md.user_ptr; @@ -344,7 +356,8 @@ static void ptlrpc_master_callback(ptl_event_t *ev) callback == client_bulk_callback || callback == request_in_callback || callback == reply_out_callback || - callback == server_bulk_callback); + callback == server_bulk_callback || + callback == rawrpc_request_out_callback); callback (ev); } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 7cdb908..f9b4ae0 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -557,6 +557,11 @@ int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd) return (-ENOMEM); } + +/******************************************** + * rawrpc stuff, currently only used by gss * + ********************************************/ + static int rawrpc_timedout(void *data) { struct ptlrpc_request *req = (struct ptlrpc_request *) data; @@ -570,6 +575,11 @@ static int rawrpc_timedout(void *data) return 1; } +static int rawrpc_timedout_wait(void *data) +{ + return 0; +} + /* to make things as simple as possible */ static int rawrpc_check_reply(struct ptlrpc_request *req) { @@ -583,17 +593,50 @@ static int rawrpc_check_reply(struct ptlrpc_request *req) return rc; } +void rawrpc_req_finished(struct ptlrpc_request *req) +{ + struct obd_import *imp; + unsigned long irq_flags; + + if (!req) + return; + + if (!atomic_dec_and_test(&req->rq_refcount)) + return; + + LASSERT(req->rq_import); + imp = req->rq_import; + + spin_lock_irqsave(&imp->imp_lock, irq_flags); + list_del_init(&req->rq_list); + spin_unlock_irqrestore(&imp->imp_lock, irq_flags); + + class_import_put(imp); + + if (req->rq_reqbuf) { + LASSERT(req->rq_reqbuf_len); + OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len); + } + if (req->rq_repbuf) { + LASSERT(req->rq_repbuf_len); + OBD_FREE(req->rq_repbuf, req->rq_repbuf_len); + } + OBD_FREE(req, sizeof(*req)); +} + /* - * Construct a fake ptlrpc_request to do the work, in order to - * user the existing callback/wakeup facilities + * if returned non-NULL, reqbuf and repbuf will be take over by the + * request: the caller can't release them directly, instead should + * call rawrpc_req_finished(). */ -int ptlrpc_do_rawrpc(struct obd_import *imp, - char *reqbuf, int reqlen, - char *repbuf, int *replenp, - int timeout) +struct ptlrpc_request * +ptl_do_rawrpc(struct obd_import *imp, + char *reqbuf, int reqbuf_len, int reqlen, + char *repbuf, int repbuf_len, int *replenp, + int timeout, int *res) { struct ptlrpc_connection *conn; - struct ptlrpc_request request; /* just a fake one */ + struct ptlrpc_request *request; ptl_handle_me_t reply_me_h; ptl_md_t reply_md, req_md; struct l_wait_info lwi; @@ -602,120 +645,157 @@ int ptlrpc_do_rawrpc(struct obd_import *imp, ENTRY; LASSERT(imp); - class_import_get(imp); - if (imp->imp_state == LUSTRE_IMP_CLOSED) { - CDEBUG(D_SEC, "raw rpc on closed imp(=>%s)? send anyway\n", - imp->imp_target_uuid.uuid); + LASSERT(reqbuf && reqbuf_len); + LASSERT(repbuf && repbuf_len); + LASSERT(reqlen && reqlen <= reqbuf_len); + + OBD_ALLOC(request, sizeof(*request)); + if (!request) { + *res = -ENOMEM; + RETURN(NULL); } + imp = request->rq_import = class_import_get(imp); conn = imp->imp_connection; + if (imp->imp_state == LUSTRE_IMP_CLOSED) { + CDEBUG(D_SEC, "raw rpc on closed imp(=>%s)\n", + imp->imp_target_uuid.uuid); + } + /* initialize request */ - memset(&request, 0, sizeof(request)); - request.rq_req_cbid.cbid_fn = request_out_callback; - request.rq_req_cbid.cbid_arg = &request; - request.rq_reply_cbid.cbid_fn = reply_in_callback; - request.rq_reply_cbid.cbid_arg = &request; - request.rq_reqbuf = reqbuf; - request.rq_reqbuf_len = reqlen; - request.rq_repbuf = repbuf; - request.rq_repbuf_len = *replenp; - request.rq_set = NULL; - spin_lock_init(&request.rq_lock); - init_waitqueue_head(&request.rq_reply_waitq); - atomic_set(&request.rq_refcount, 1000000); /* never be droped */ - request.rq_xid = ptlrpc_next_xid(); + request->rq_req_cbid.cbid_fn = rawrpc_request_out_callback; + request->rq_req_cbid.cbid_arg = request; + request->rq_reply_cbid.cbid_fn = reply_in_callback; + request->rq_reply_cbid.cbid_arg = request; + request->rq_reqbuf = reqbuf; + request->rq_reqbuf_len = reqbuf_len; + request->rq_repbuf = repbuf; + request->rq_repbuf_len = repbuf_len; + request->rq_set = NULL; + spin_lock_init(&request->rq_lock); + init_waitqueue_head(&request->rq_reply_waitq); + atomic_set(&request->rq_refcount, 1); + request->rq_xid = ptlrpc_next_xid(); /* add into sending list */ spin_lock_irqsave(&imp->imp_lock, irq_flags); - list_add_tail(&request.rq_list, &imp->imp_rawrpc_list); + list_add_tail(&request->rq_list, &imp->imp_rawrpc_list); spin_unlock_irqrestore(&imp->imp_lock, irq_flags); /* prepare reply buffer */ rc = PtlMEAttach(conn->c_peer.peer_ni->pni_ni_h, imp->imp_client->cli_reply_portal, - conn->c_peer.peer_id, request.rq_xid, 0, PTL_UNLINK, + conn->c_peer.peer_id, request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER, &reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); LASSERT (rc == PTL_NO_SPACE); - GOTO(cleanup_imp, rc = -ENOMEM); + GOTO(out, rc = -ENOMEM); } - spin_lock_irqsave(&request.rq_lock, irq_flags); - request.rq_receiving_reply = 1; - spin_unlock_irqrestore(&request.rq_lock, irq_flags); + spin_lock_irqsave(&request->rq_lock, irq_flags); + request->rq_receiving_reply = 1; + spin_unlock_irqrestore(&request->rq_lock, irq_flags); reply_md.start = repbuf; - reply_md.length = *replenp; + reply_md.length = repbuf_len; reply_md.threshold = 1; reply_md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT; - reply_md.user_ptr = &request.rq_reply_cbid; + reply_md.user_ptr = &request->rq_reply_cbid; reply_md.eq_handle = conn->c_peer.peer_ni->pni_eq_h; rc = PtlMDAttach(reply_me_h, reply_md, PTL_UNLINK, - &request.rq_reply_md_h); + &request->rq_reply_md_h); if (rc != PTL_OK) { CERROR("PtlMDAttach failed: %d\n", rc); LASSERT (rc == PTL_NO_SPACE); GOTO(cleanup_me, rc = -ENOMEM); } + /* + * the extra 2 refcount will be balanced by out_callback + */ + atomic_set(&request->rq_refcount, 3); + /* prepare request buffer */ req_md.start = reqbuf; req_md.length = reqlen; req_md.threshold = 1; req_md.options = PTLRPC_MD_OPTIONS; - req_md.user_ptr = &request.rq_req_cbid; + req_md.user_ptr = &request->rq_req_cbid; req_md.eq_handle = conn->c_peer.peer_ni->pni_eq_h; rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, - req_md, PTL_UNLINK, &request.rq_req_md_h); + req_md, PTL_UNLINK, &request->rq_req_md_h); if (rc != PTL_OK) { CERROR("PtlMDBind failed %d\n", rc); LASSERT (rc == PTL_NO_SPACE); + atomic_set(&request->rq_refcount, 1); GOTO(cleanup_me, rc = -ENOMEM); } - rc = PtlPut(request.rq_req_md_h, PTL_NOACK_REQ, conn->c_peer.peer_id, - imp->imp_client->cli_request_portal, - 0, request.rq_xid, 0, 0); + rc = PtlPut(request->rq_req_md_h, PTL_NOACK_REQ, conn->c_peer.peer_id, + imp->imp_client->cli_request_portal, 0, + request->rq_xid, 0, 0); if (rc != PTL_OK) { CERROR("PtlPut failed %d\n", rc); GOTO(cleanup_md, rc); } - lwi = LWI_TIMEOUT(timeout * HZ, rawrpc_timedout, &request); - l_wait_event(request.rq_reply_waitq, - rawrpc_check_reply(&request), &lwi); - - ptlrpc_unregister_reply(&request); - - if (request.rq_err || request.rq_resend || request.rq_intr || - request.rq_timedout || !request.rq_replied) { - CERROR("secinit rpc error: err %d, resend %d, " - "intr %d, timedout %d, replied %d\n", - request.rq_err, request.rq_resend, request.rq_intr, - request.rq_timedout, request.rq_replied); - if (request.rq_timedout) - rc = -ETIMEDOUT; - else - rc = -EINVAL; - } else { - *replenp = request.rq_nob_received; + if (timeout) + lwi = LWI_TIMEOUT(timeout * HZ, rawrpc_timedout, request); + else + lwi = LWI_TIMEOUT(100 * HZ, rawrpc_timedout_wait, request); + + l_wait_event(request->rq_reply_waitq, + rawrpc_check_reply(request), &lwi); + + ptlrpc_unregister_reply(request); + + if (request->rq_replied) { + *replenp = request->rq_nob_received; rc = 0; + } else { + CERROR("rawrpc error: err %d, neterr %d, int %d, timedout %d\n", + request->rq_err, request->rq_net_err, + request->rq_intr, request->rq_timedout); + + /* give timedout higher priority */ + rc = request->rq_timedout ? -ETIMEDOUT : -EIO; } - GOTO(cleanup_imp, rc); + +out: + *res = rc; + RETURN(request); cleanup_md: - PtlMDUnlink(request.rq_req_md_h); + PtlMDUnlink(request->rq_req_md_h); cleanup_me: PtlMEUnlink(reply_me_h); -cleanup_imp: - spin_lock_irqsave(&imp->imp_lock, irq_flags); - list_del_init(&request.rq_list); - spin_unlock_irqrestore(&imp->imp_lock, irq_flags); + goto out; +} - class_import_put(imp); - RETURN(rc); +/* + * caller will take care the reqbuf & repbuf, and only return + * when the rpc really finished on wire. + */ +int ptl_do_rawrpc_simple(struct obd_import *imp, + char *reqbuf, int reqlen, + char *repbuf, int *replenp) +{ + struct ptlrpc_request *req; + int res; + + req = ptl_do_rawrpc(imp, reqbuf, reqlen, reqlen, + repbuf, *replenp, replenp, 0, &res); + + if (req == NULL) + return res; + + req->rq_reqbuf = req->rq_repbuf = NULL; + req->rq_reqbuf_len = req->rq_repbuf_len = 0; + + rawrpc_req_finished(req); + return res; } diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 786bdce..c92f34e 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -92,7 +92,9 @@ EXPORT_SYMBOL(ptlrpc_reply); EXPORT_SYMBOL(ptlrpc_error); EXPORT_SYMBOL(ptlrpc_resend_req); EXPORT_SYMBOL(ptl_send_rpc); -EXPORT_SYMBOL(ptlrpc_do_rawrpc); +EXPORT_SYMBOL(ptl_do_rawrpc); +EXPORT_SYMBOL(ptl_do_rawrpc_simple); +EXPORT_SYMBOL(rawrpc_req_finished); /* client.c */ EXPORT_SYMBOL(ptlrpc_init_client); diff --git a/lustre/sec/gss/sec_gss.c b/lustre/sec/gss/sec_gss.c index d3cf0d1..c6f7eb7 100644 --- a/lustre/sec/gss/sec_gss.c +++ b/lustre/sec/gss/sec_gss.c @@ -258,6 +258,7 @@ struct lgssd_ioctl_param { static int gss_send_secinit_rpc(__user char *buffer, unsigned long count) { struct obd_import *imp; + struct ptlrpc_request *request = NULL; struct lgssd_ioctl_param param; const int reqbuf_size = 1024; const int repbuf_size = 1024; @@ -319,10 +320,10 @@ static int gss_send_secinit_rpc(__user char *buffer, unsigned long count) goto out_copy; } - replen = repbuf_size; - rc = ptlrpc_do_rawrpc(imp, reqbuf, reqlen, - repbuf, &replen, SECINIT_RPC_TIMEOUT); - if (rc) { + request = ptl_do_rawrpc(imp, reqbuf, reqbuf_size, reqlen, + repbuf, repbuf_size, &replen, + SECINIT_RPC_TIMEOUT, &rc); + if (request == NULL || rc) { param.status = rc; goto out_copy; } @@ -351,32 +352,15 @@ out_copy: rc = 0; class_import_put(imp); - if (repbuf) - OBD_FREE(repbuf, repbuf_size); - if (reqbuf) - OBD_FREE(reqbuf, reqbuf_size); - RETURN(rc); -} - -static int gss_send_secfini_rpc(struct obd_import *imp, - char *reqbuf, int reqlen) -{ - const int repbuf_size = 1024; - char *repbuf; - int replen = repbuf_size; - int rc; - - OBD_ALLOC(repbuf, repbuf_size); - if (!repbuf) { - CERROR("Out of memory\n"); - return -ENOMEM; + if (request == NULL) { + if (repbuf) + OBD_FREE(repbuf, repbuf_size); + if (reqbuf) + OBD_FREE(reqbuf, reqbuf_size); + } else { + rawrpc_req_finished(request); } - - rc = ptlrpc_do_rawrpc(imp, reqbuf, reqlen, repbuf, &replen, - SECFINI_RPC_TIMEOUT); - - OBD_FREE(repbuf, repbuf_size); - return rc; + RETURN(rc); } /********************************************** @@ -1313,6 +1297,10 @@ static void destroy_gss_context(struct ptlrpc_cred *cred) struct ptlrpc_request req; struct obd_import *imp; __u32 *vp, lmsg_size; + struct ptlrpc_request *raw_req = NULL; + const int repbuf_len = 256; + char *repbuf; + int replen, rc; ENTRY; /* cred's refcount is 0, steal one */ @@ -1379,10 +1367,21 @@ static void destroy_gss_context(struct ptlrpc_cred *cred) } atomic_dec(&cred->pc_refcount); - /* send out */ - gss_send_secfini_rpc(imp, req.rq_reqbuf, req.rq_reqdata_len); + OBD_ALLOC(repbuf, repbuf_len); + if (!repbuf) + goto exit; + + raw_req = ptl_do_rawrpc(imp, req.rq_reqbuf, req.rq_reqbuf_len, + req.rq_reqdata_len, repbuf, repbuf_len, &replen, + SECFINI_RPC_TIMEOUT, &rc); + if (!raw_req) + OBD_FREE(repbuf, repbuf_len); + exit: - OBD_FREE(req.rq_reqbuf, req.rq_reqbuf_len); + if (raw_req == NULL) + OBD_FREE(req.rq_reqbuf, req.rq_reqbuf_len); + else + rawrpc_req_finished(raw_req); EXIT; } -- 1.8.3.1