From 52596effa8923821f1d3afbc9b4ca733baa76ca4 Mon Sep 17 00:00:00 2001 From: yury Date: Sat, 8 Nov 2008 10:43:30 +0000 Subject: [PATCH] b=17310 r=johann,shadow - make sure that no new inflight rpcs may come after ptlrpcd_deactivate_import() for both synchronous and asynchronous sending. To do so we make sure that imp_inflight++ is done only when permission is granted by ptlrpc_import_delay_req() which makes decision should req be sent, deferred or killed as import is not in the state to send it in observable future. For async sending, rpc is only counted inflight when its added to sending or delaying list instead of just adding it to set for processing. This fixes assert in ptlrpc_invalidate_import() and as number of other issues; - synchronize imp_inflight and the presence on sending or delaying list for ptlrpc_queue_wait() case. So that, now it is guaranteed that if imp_inflight != 0 we may always find hanging rpc either in sending or in delaying list; - make sure that in ptlrcp_queue_wait() we remove rpc from sending or delaying list and dec inflight only after ptlrpc_unregister_reply() is done. This way we make sure that accounting is correct. Rpc can't be returned to the pool or counted finished until lnet lets us go with finished reply unlink; - check for inflight and rq_list in pinger; - comments, cleanups; --- lustre/ptlrpc/client.c | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 00bd237..450a917 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -707,8 +707,6 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, list_add_tail(&req->rq_set_chain, &set->set_requests); req->rq_set = set; set->set_remaining++; - - atomic_inc(&req->rq_import->imp_inflight); } /** @@ -1015,13 +1013,12 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) spin_unlock (&req->rq_lock); DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: " - "(%s != %s)", - lustre_msg_get_status(req->rq_reqmsg) , + "(%s != %s)", lustre_msg_get_status(req->rq_reqmsg), ptlrpc_import_state_name(req->rq_send_state), ptlrpc_import_state_name(imp->imp_state)); - LASSERT(list_empty (&req->rq_list)); - + LASSERT(list_empty(&req->rq_list)); list_add_tail(&req->rq_list, &imp->imp_delayed_list); + atomic_inc(&req->rq_import->imp_inflight); spin_unlock(&imp->imp_lock); RETURN(0); } @@ -1033,9 +1030,9 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) RETURN(rc); } - /* XXX this is the same as ptlrpc_queue_wait */ LASSERT(list_empty(&req->rq_list)); list_add_tail(&req->rq_list, &imp->imp_sending_list); + atomic_inc(&req->rq_import->imp_inflight); spin_unlock(&imp->imp_lock); lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid()); @@ -1308,9 +1305,14 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) lustre_msg_get_opc(req->rq_reqmsg)); spin_lock(&imp->imp_lock); - if (!list_empty(&req->rq_list)) + /* Request already may be not on sending or delaying list. This + * may happen in the case of marking it errorneous for the case + * ptlrpc_import_delay_req(req, status) find it impossible to + * allow sending this rpc and returns *status != 0. */ + if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); - atomic_dec(&imp->imp_inflight); + atomic_dec(&imp->imp_inflight); + } spin_unlock(&imp->imp_lock); set->set_remaining--; @@ -1934,7 +1936,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) LASSERT(req->rq_set == NULL); LASSERT(!req->rq_receiving_reply); - atomic_inc(&imp->imp_inflight); /* for distributed debugging */ lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid()); @@ -1954,8 +1955,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) restart: if (ptlrpc_import_delay_req(imp, req, &rc)) { list_del(&req->rq_list); - list_add_tail(&req->rq_list, &imp->imp_delayed_list); + atomic_inc(&imp->imp_inflight); spin_unlock(&imp->imp_lock); DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)", @@ -1975,6 +1976,7 @@ restart: spin_lock(&imp->imp_lock); list_del_init(&req->rq_list); + atomic_dec(&imp->imp_inflight); if (req->rq_err) { /* rq_status was set locally */ @@ -1993,7 +1995,6 @@ restart: } if (rc != 0) { - list_del_init(&req->rq_list); spin_unlock(&imp->imp_lock); req->rq_status = rc; // XXX this ok? GOTO(out, rc); @@ -2021,6 +2022,7 @@ restart: /* XXX this is the same as ptlrpc_set_wait */ LASSERT(list_empty(&req->rq_list)); list_add_tail(&req->rq_list, &imp->imp_sending_list); + atomic_inc(&imp->imp_inflight); spin_unlock(&imp->imp_lock); rc = ptl_send_rpc(req, 0); @@ -2052,15 +2054,17 @@ restart: lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, libcfs_nid2str(imp->imp_connection->c_peer.nid), lustre_msg_get_opc(req->rq_reqmsg)); - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - spin_unlock(&imp->imp_lock); /* If the reply was received normally, this just grabs the spinlock * (ensuring the reply callback has returned), sees that * req->rq_receiving_reply is clear and returns. */ ptlrpc_unregister_reply(req, 0); + spin_lock(&imp->imp_lock); + list_del_init(&req->rq_list); + atomic_dec(&imp->imp_inflight); + spin_unlock(&imp->imp_lock); + if (req->rq_err) { DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d", rc, req->rq_status); @@ -2130,8 +2134,6 @@ restart: LASSERT(!req->rq_receiving_reply); ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - - atomic_dec(&imp->imp_inflight); cfs_waitq_signal(&imp->imp_recovery_waitq); RETURN(rc); } -- 1.8.3.1