From 8599341c19f431ebd67d09bbafc652cefea2155f Mon Sep 17 00:00:00 2001 From: yury Date: Thu, 6 Nov 2008 07:32:43 +0000 Subject: [PATCH] b=17310 r=johann,shadow - fixes ptlrpcd blocking on very long reply unlink waiting. To do so new rpc phase introduced RQ_PHASE_UNREGISTERING in which request stay until we have reply_in_callback() called by lnet signaling that reply is unlinked. All requests in this state are skipped in processing by prlrcd instead of waiting n * 300s on each of them. This allows ptlrpcd to process other rpcs in the set; - make sure that inflight count is coherent with being present on sending or delay list. That is, if we see inflight != 0, rpc must be on one of these lists. This is very helpful in ptlrpc_invalidate_import() to show all rpcs still waiting after invalidating import; - in ptlrpc_invalidate_import() wait maximal rq_deadline - now from all inflight rpcs instead of obd_timeout which may be much longer. If calculated timeout is 0, obd_timeout is used. This fixes the issue that rq_deadline - now > obd_timeout (very easy to see in logs) which led to inflight != 0 assert because inflight rpcs timed out later than our wait period is finished; - in ptlrpc_invalidate_import() wait forever for rpcs in UNREGISTERING phase. Check in assert for inflight == 0 for wait timed out case if no rpcs in UNREGISTERING phase. Only those in UNREGISTERING phase are allowed to stay longer than obd_timeout; - added ptlrpc_move_rqphase() function. All phase changes go through it. Add debug_req() there to track down all phase changes; - conf_sanity.sh test_45 added to emulate very long reply unlink and also situation when rq_deadline - now > obd_timeout; - fixed using rq_timedout in debug_req(); - do not wait forever in ptlrpc_unregister_reply() for async case (using it from sets). Sync case left unchanged. --- lustre/ptlrpc/client.c | 219 ++++++++++++++++++++++++++++++------------------- 1 file changed, 136 insertions(+), 83 deletions(-) diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 3aa63ff..70c852b 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -578,7 +578,9 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode, request->rq_reply_cbid.cbid_fn = reply_in_callback; request->rq_reply_cbid.cbid_arg = request; + request->rq_reply_deadline = 0; request->rq_phase = RQ_PHASE_NEW; + request->rq_next_phase = RQ_PHASE_UNDEFINED; request->rq_request_portal = imp->imp_client->cli_request_portal; request->rq_reply_portal = imp->imp_client->cli_reply_portal; @@ -810,7 +812,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) /* serialise with network callback */ spin_lock(&req->rq_lock); - if (req->rq_replied) { + if (ptlrpc_client_replied(req)) { what = "REPLIED: "; GOTO(out, rc = 1); } @@ -818,7 +820,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) if (req->rq_net_err && !req->rq_timedout) { what = "NETERR: "; spin_unlock(&req->rq_lock); - rc = ptlrpc_expire_one_request(req); + rc = ptlrpc_expire_one_request(req, 0); spin_lock(&req->rq_lock); GOTO(out, rc); } @@ -838,7 +840,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) GOTO(out, rc = 1); } - if (req->rq_early) { + if (ptlrpc_client_early(req)) { what = "EARLYREP: "; ptlrpc_at_recv_early_reply(req); GOTO(out, rc = 0); /* keep waiting */ @@ -999,7 +1001,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS)) RETURN (0); - req->rq_phase = RQ_PHASE_RPC; + ptlrpc_rqphase_move(req, RQ_PHASE_RPC); imp = req->rq_import; spin_lock(&imp->imp_lock); @@ -1026,7 +1028,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) if (rc != 0) { spin_unlock(&imp->imp_lock); req->rq_status = rc; - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); RETURN(rc); } @@ -1072,6 +1074,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) ptlrpc_send_new_req(req)) { force_timer_recalc = 1; } + /* delayed send - skip */ if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent) continue; @@ -1079,30 +1082,53 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) if (!(req->rq_phase == RQ_PHASE_RPC || req->rq_phase == RQ_PHASE_BULK || req->rq_phase == RQ_PHASE_INTERPRET || + req->rq_phase == RQ_PHASE_UNREGISTERING || req->rq_phase == RQ_PHASE_COMPLETE)) { DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase); LBUG(); } + if (req->rq_phase == RQ_PHASE_UNREGISTERING) { + LASSERT(req->rq_next_phase != req->rq_phase); + LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED); + + /* Skip processing until reply is unlinked. We + * can't return to pool before that and we can't + * call interpret before that. We need to make + * sure that all rdma transfers finished and will + * not corrupt any data. */ + if (ptlrpc_client_recv_or_unlink(req)) + continue; + + /* Turn fail_loc off to prevent it from looping + * forever. */ + OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK | + OBD_FAIL_ONCE); + + /* Move to next phase if reply was successfully + * unlinked. */ + ptlrpc_rqphase_move(req, req->rq_next_phase); + } + if (req->rq_phase == RQ_PHASE_COMPLETE) continue; if (req->rq_phase == RQ_PHASE_INTERPRET) GOTO(interpret, req->rq_status); - if (req->rq_net_err && !req->rq_timedout) - ptlrpc_expire_one_request(req); + /* Note that this also will start async reply unlink */ + if (req->rq_net_err && !req->rq_timedout) { + ptlrpc_expire_one_request(req, 1); + + /* Check if we still need to wait for unlink. */ + if (ptlrpc_client_recv_or_unlink(req)) + continue; + } if (req->rq_err) { - ptlrpc_unregister_reply(req); if (req->rq_status == 0) req->rq_status = -EIO; - req->rq_phase = RQ_PHASE_INTERPRET; - - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - spin_unlock(&imp->imp_lock); - + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); GOTO(interpret, req->rq_status); } @@ -1112,15 +1138,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) * seen a timeout. our policy is to only interpret * interrupted rpcs after they have timed out */ if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) { - /* NB could be on delayed list */ - ptlrpc_unregister_reply(req); req->rq_status = -EINTR; - req->rq_phase = RQ_PHASE_INTERPRET; - - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - spin_unlock(&imp->imp_lock); - + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); GOTO(interpret, req->rq_status); } @@ -1128,7 +1147,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) if (req->rq_timedout||req->rq_waiting||req->rq_resend) { int status; - ptlrpc_unregister_reply(req); + if (!ptlrpc_unregister_reply(req, 1)) + continue; spin_lock(&imp->imp_lock); @@ -1137,19 +1157,22 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) continue; } - list_del_init(&req->rq_list); if (status != 0) { req->rq_status = status; - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, + RQ_PHASE_INTERPRET); spin_unlock(&imp->imp_lock); GOTO(interpret, req->rq_status); } if (req->rq_no_resend) { req->rq_status = -ENOTCONN; - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, + RQ_PHASE_INTERPRET); spin_unlock(&imp->imp_lock); GOTO(interpret, req->rq_status); } + + list_del_init(&req->rq_list); list_add_tail(&req->rq_list, &imp->imp_sending_list); @@ -1166,7 +1189,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) if (req->rq_bulk) { __u64 old_xid = req->rq_xid; - ptlrpc_unregister_bulk (req); + ptlrpc_unregister_bulk(req); /* ensure previous bulk fails */ req->rq_xid = ptlrpc_next_xid(); @@ -1190,36 +1213,33 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) spin_lock(&req->rq_lock); - if (req->rq_early) { + if (ptlrpc_client_early(req)) { ptlrpc_at_recv_early_reply(req); spin_unlock(&req->rq_lock); continue; } /* Still waiting for a reply? */ - if (req->rq_receiving_reply) { + if (ptlrpc_client_recv(req)) { spin_unlock(&req->rq_lock); continue; } /* Did we actually receive a reply? */ - if (!req->rq_replied) { + if (!ptlrpc_client_replied(req)) { spin_unlock(&req->rq_lock); continue; } spin_unlock(&req->rq_lock); - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - spin_unlock(&imp->imp_lock); - req->rq_status = after_reply(req); if (req->rq_resend) { /* Add this req to the delayed list so it can be errored if the import is evicted after recovery. */ spin_lock(&imp->imp_lock); + list_del_init(&req->rq_list); list_add_tail(&req->rq_list, &imp->imp_delayed_list); spin_unlock(&imp->imp_lock); @@ -1228,15 +1248,15 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) /* If there is no bulk associated with this request, * then we're done and should let the interpreter - * process the reply. Similarly if the RPC returned + * process the reply. Similarly if the RPC returned * an error, and therefore the bulk will never arrive. */ if (req->rq_bulk == NULL || req->rq_status != 0) { - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); GOTO(interpret, req->rq_status); } - req->rq_phase = RQ_PHASE_BULK; + ptlrpc_rqphase_move(req, RQ_PHASE_BULK); } LASSERT(req->rq_phase == RQ_PHASE_BULK); @@ -1250,19 +1270,26 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) * the ACK for her PUT. */ DEBUG_REQ(D_ERROR, req, "bulk transfer failed"); req->rq_status = -EIO; - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); GOTO(interpret, req->rq_status); } - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); interpret: LASSERT(req->rq_phase == RQ_PHASE_INTERPRET); - LASSERT(!req->rq_receiving_reply); - ptlrpc_unregister_reply(req); + /* This moves to "unregistering" phase we need to wait for + * reply unlink. */ + if (!ptlrpc_unregister_reply(req, 1)) + continue; + if (req->rq_bulk != NULL) - ptlrpc_unregister_bulk (req); + ptlrpc_unregister_bulk(req); + + /* When calling interpret receiving already should be + * finished. */ + LASSERT(!req->rq_receiving_reply); if (req->rq_interpret_reply != NULL) { int (*interpreter)(struct ptlrpc_request *,void *,int) = @@ -1270,7 +1297,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) req->rq_status = interpreter(req, &req->rq_async_args, req->rq_status); } - req->rq_phase = RQ_PHASE_COMPLETE; + ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:" "opc %s:%s:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(), @@ -1279,9 +1306,13 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) libcfs_nid2str(imp->imp_connection->c_peer.nid), lustre_msg_get_opc(req->rq_reqmsg)); - set->set_remaining--; - + spin_lock(&imp->imp_lock); + if (!list_empty(&req->rq_list)) + list_del_init(&req->rq_list); atomic_dec(&imp->imp_inflight); + spin_unlock(&imp->imp_lock); + + set->set_remaining--; cfs_waitq_signal(&imp->imp_recovery_waitq); } @@ -1290,7 +1321,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) } /* Return 1 if we should give up, else 0 */ -int ptlrpc_expire_one_request(struct ptlrpc_request *req) +int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) { struct obd_import *imp = req->rq_import; int rc = 0; @@ -1316,7 +1347,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req) req->rq_timedout = 1; spin_unlock(&req->rq_lock); - ptlrpc_unregister_reply (req); + ptlrpc_unregister_reply(req, async_unlink); if (obd_dump_on_timeout) libcfs_debug_dumplog(); @@ -1373,24 +1404,24 @@ int ptlrpc_expired_set(void *data) struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - /* request in-flight? */ - if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting && - !req->rq_resend) || + /* Request in-flight? */ + if (!((req->rq_phase == RQ_PHASE_RPC && + !req->rq_waiting && !req->rq_resend) || (req->rq_phase == RQ_PHASE_BULK))) continue; - if (req->rq_timedout || /* already dealt with */ + if (req->rq_timedout || /* already dealt with */ req->rq_deadline > now) /* not expired */ continue; - /* deal with this guy */ - ptlrpc_expire_one_request (req); + /* Deal with this guy. Do it asynchronously to not block + * ptlrpcd thread. */ + ptlrpc_expire_one_request(req, 1); } /* When waiting for a whole set, we always to break out of the * sleep so we can recalculate the timeout, or enable interrupts - * iff everyone's timed out. - */ + * if everyone's timed out. */ RETURN(1); } @@ -1436,7 +1467,9 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); /* request in-flight? */ - if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || + if (!(((req->rq_phase & (RQ_PHASE_RPC | + RQ_PHASE_UNREGISTERING)) && + !req->rq_waiting) || (req->rq_phase == RQ_PHASE_BULK) || (req->rq_phase == RQ_PHASE_NEW))) continue; @@ -1453,9 +1486,8 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) timeout = 1; /* ASAP */ break; } - if ((timeout == 0) || (timeout > (deadline - now))) { + if ((timeout == 0) || (timeout > (deadline - now))) timeout = deadline - now; - } } RETURN(timeout); } @@ -1538,6 +1570,8 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request) struct ptlrpc_request_pool *pool = request->rq_pool; spin_lock(&pool->prp_lock); + LASSERT(list_empty(&request->rq_list)); + LASSERT(!request->rq_receiving_reply); list_add_tail(&request->rq_list, &pool->prp_req_list); spin_unlock(&pool->prp_lock); } @@ -1648,24 +1682,41 @@ EXPORT_SYMBOL(ptlrpc_req_xid); * IDEMPOTENT, but _not_ safe against concurrent callers. * The request owner (i.e. the thread doing the I/O) must call... */ -void ptlrpc_unregister_reply (struct ptlrpc_request *request) +int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) { int rc; cfs_waitq_t *wq; struct l_wait_info lwi; ENTRY; - LASSERT(!in_interrupt ()); /* might sleep */ + /* Might sleep. */ + LASSERT(!in_interrupt()); + + /* Let's setup deadline for reply unlink. */ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + request->rq_reply_deadline == 0) + request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK; + + /* Nothing left to do. */ if (!ptlrpc_client_recv_or_unlink(request)) - /* Nothing left to do */ - return; + RETURN(1); + + LNetMDUnlink(request->rq_reply_md_h); + + /* Let's check it once again. */ + if (!ptlrpc_client_recv_or_unlink(request)) + RETURN(1); + + /* Move to "Unregistering" phase as reply was not unlinked yet. */ + ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING); - LNetMDUnlink (request->rq_reply_md_h); + /* Do not wait for unlink to finish. */ + if (async) + RETURN(0); /* We have to l_wait_event() whatever the result, to give liblustre * a chance to run reply_in_callback(), and to make sure we've * unlinked before returning a req to the pool */ - if (request->rq_set != NULL) wq = &request->rq_set->set_waitq; else @@ -1675,17 +1726,19 @@ void ptlrpc_unregister_reply (struct ptlrpc_request *request) /* Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL); - rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request), - &lwi); - if (rc == 0) - return; + rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), + &lwi); + if (rc == 0) { + ptlrpc_rqphase_move(request, request->rq_next_phase); + RETURN(1); + } - LASSERT (rc == -ETIMEDOUT); + LASSERT(rc == -ETIMEDOUT); DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout " "rvcng=%d unlnk=%d", request->rq_receiving_reply, request->rq_must_unlink); } - EXIT; + RETURN(0); } /* caller must hold imp->imp_lock */ @@ -1779,7 +1832,7 @@ void ptlrpc_resend_req(struct ptlrpc_request *req) CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n", old_xid, req->rq_xid); } - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); } @@ -1792,7 +1845,7 @@ void ptlrpc_restart_req(struct ptlrpc_request *req) spin_lock(&req->rq_lock); req->rq_restart = 1; req->rq_timedout = 0; - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); } @@ -1883,7 +1936,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) lustre_msg_get_opc(req->rq_reqmsg)); /* Mark phase here for a little debug help */ - req->rq_phase = RQ_PHASE_RPC; + ptlrpc_rqphase_move(req, RQ_PHASE_RPC); spin_lock(&imp->imp_lock); req->rq_import_generation = imp->imp_generation; @@ -1975,7 +2028,7 @@ restart: } while ((brc == -ETIMEDOUT) && (req->rq_deadline > cfs_time_current_sec())); - if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req)) { + if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req, 0)) { /* Wait forever for reconnect / replay or failure */ lwi = LWI_INTR(interrupted_request, req); brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), @@ -1995,8 +2048,7 @@ restart: /* If the reply was received normally, this just grabs the spinlock * (ensuring the reply callback has returned), sees that * req->rq_receiving_reply is clear and returns. */ - ptlrpc_unregister_reply (req); - + ptlrpc_unregister_reply(req, 0); if (req->rq_err) { DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d", @@ -2025,7 +2077,7 @@ restart: GOTO(out, rc = -ETIMEDOUT); } - if (!req->rq_replied) { + if (!ptlrpc_client_replied(req)) { /* How can this be? -eeb */ DEBUG_REQ(D_ERROR, req, "!rq_replied: "); LBUG(); @@ -2066,7 +2118,7 @@ restart: } LASSERT(!req->rq_receiving_reply); - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); atomic_dec(&imp->imp_inflight); cfs_waitq_signal(&imp->imp_recovery_waitq); @@ -2087,7 +2139,7 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req, ENTRY; atomic_dec(&imp->imp_replay_inflight); - if (!req->rq_replied) { + if (!ptlrpc_client_replied(req)) { CERROR("request replay timed out, restarting recovery\n"); GOTO(out, rc = -ETIMEDOUT); } @@ -2126,7 +2178,7 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req, if (req->rq_replay_cb) req->rq_replay_cb(req); - if (req->rq_replied && + if (ptlrpc_client_replied(req) && lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) { DEBUG_REQ(D_ERROR, req, "status %d, old was %d", lustre_msg_get_status(req->rq_repmsg), @@ -2166,6 +2218,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) aa->praa_old_state = req->rq_send_state; req->rq_send_state = LUSTRE_IMP_REPLAY; req->rq_phase = RQ_PHASE_NEW; + req->rq_next_phase = RQ_PHASE_UNDEFINED; if (req->rq_repmsg) aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg); req->rq_status = 0; @@ -2207,7 +2260,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) if (req->rq_import_generation < imp->imp_generation) { req->rq_err = 1; req->rq_status = -EINTR; - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); } spin_unlock (&req->rq_lock); } @@ -2222,7 +2275,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) if (req->rq_import_generation < imp->imp_generation) { req->rq_err = 1; req->rq_status = -EINTR; - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); } spin_unlock (&req->rq_lock); } @@ -2255,7 +2308,7 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set) req->rq_err = 1; req->rq_status = -EINTR; - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); spin_unlock (&req->rq_lock); } } -- 1.8.3.1