From: yury Date: Mon, 24 Nov 2008 16:44:04 +0000 (+0000) Subject: b=17631 X-Git-Tag: v1_9_120~92 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=dde3e359fd9bfd9225b1a88089196d306d579c73 b=17631 r=shadow,panda - fixes long sync bulk unlink in ptlrpcd which stops other rpcs from being handled and also causes asstion in umount time; - make sure that long unlink wait is done with 1 sec interval to return quickly. --- diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 0561fea..8fa08d2 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -477,6 +477,7 @@ struct ptlrpc_request { so that servers' early reply updates to the deadline aren't kept in per-cpu cache */ time_t rq_reply_deadline; /* when req reply unlink must finish. */ + time_t rq_bulk_deadline; /* when req bulk unlink must finish. */ int rq_timeout; /* service time estimate (secs) */ /* Multi-rpc bits */ @@ -871,16 +872,38 @@ extern lnet_pid_t ptl_get_pid(void); int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc); void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc); int ptlrpc_register_bulk(struct ptlrpc_request *req); -void ptlrpc_unregister_bulk (struct ptlrpc_request *req); +int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async); -static inline int ptlrpc_bulk_active (struct ptlrpc_bulk_desc *desc) +static inline int ptlrpc_server_bulk_active(struct ptlrpc_bulk_desc *desc) { - int rc; + int rc; + + LASSERT(desc != NULL); + + spin_lock(&desc->bd_lock); + rc = desc->bd_network_rw; + spin_unlock(&desc->bd_lock); + return rc; +} + +static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req) +{ + struct ptlrpc_bulk_desc *desc = req->rq_bulk; + int rc; + + LASSERT(req != NULL); + + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) && + req->rq_bulk_deadline > cfs_time_current_sec()) + return 1; + + if (!desc) + return 0; spin_lock(&desc->bd_lock); rc = desc->bd_network_rw; spin_unlock(&desc->bd_lock); - return (rc); + return rc; } #define PTLRPC_REPLY_MAYBE_DIFFICULT 0x01 @@ -1138,7 +1161,7 @@ ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase) static inline int ptlrpc_client_early(struct ptlrpc_request *req) { - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && req->rq_reply_deadline > cfs_time_current_sec()) return 0; return req->rq_early; @@ -1147,7 +1170,7 @@ ptlrpc_client_early(struct ptlrpc_request *req) static inline int ptlrpc_client_replied(struct ptlrpc_request *req) { - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && req->rq_reply_deadline > cfs_time_current_sec()) return 0; return req->rq_replied; @@ -1156,7 +1179,7 @@ ptlrpc_client_replied(struct ptlrpc_request *req) static inline int ptlrpc_client_recv(struct ptlrpc_request *req) { - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && req->rq_reply_deadline > cfs_time_current_sec()) return 1; return req->rq_receiving_reply; @@ -1168,7 +1191,7 @@ ptlrpc_client_recv_or_unlink(struct ptlrpc_request *req) int rc; spin_lock(&req->rq_lock); - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && req->rq_reply_deadline > cfs_time_current_sec()) { spin_unlock(&req->rq_lock); return 1; diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index a02b764..bcb16c9 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -291,9 +291,10 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_PTLRPC_PAUSE_REP 0x50c #define OBD_FAIL_PTLRPC_DUMP_LOG 0x50e -#define OBD_FAIL_PTLRPC_LONG_UNLINK 0x50f -#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510 -#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511 +#define OBD_FAIL_PTLRPC_LONG_REPL_UNLINK 0x50f +#define OBD_FAIL_PTLRPC_LONG_BULK_UNLINK 0x510 +#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x511 +#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x512 #define OBD_FAIL_OBD_PING_NET 0x600 #define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 4d2eb41..6e582c1 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1178,8 +1178,8 @@ static int mdt_sendpage(struct mdt_thread_info *info, if (timeout < 0) CERROR("Req deadline already passed %lu (now: %lu)\n", req->rq_deadline, cfs_time_current_sec()); - *lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL); - rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi); + *lwi = LWI_TIMEOUT(cfs_time_seconds(max(timeout, 1)), NULL, NULL); + rc = l_wait_event(desc->bd_waitq, !ptlrpc_server_bulk_active(desc), lwi); LASSERT (rc == 0 || rc == -ETIMEDOUT); if (rc == 0) { diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index bdea74e..3d56643 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -761,7 +761,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) ost_bulk_timeout, desc); rc = l_wait_event(desc->bd_waitq, - !ptlrpc_bulk_active(desc) || + !ptlrpc_server_bulk_active(desc) || exp->exp_failed, &lwi); LASSERT(rc == 0 || rc == -ETIMEDOUT); /* Wait again if we changed deadline */ @@ -976,7 +976,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if (desc->bd_export->exp_failed) rc = -ENOTCONN; else - rc = ptlrpc_start_bulk_transfer (desc); + rc = ptlrpc_start_bulk_transfer(desc); if (rc == 0) { time_t start = cfs_time_current_sec(); do { @@ -987,7 +987,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1), ost_bulk_timeout, desc); rc = l_wait_event(desc->bd_waitq, - !ptlrpc_bulk_active(desc) || + !ptlrpc_server_bulk_active(desc) || desc->bd_export->exp_failed, &lwi); LASSERT(rc == 0 || rc == -ETIMEDOUT); /* Wait again if we changed deadline */ diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 045365c..49694a1 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -101,8 +101,8 @@ static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal return desc; } -struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req, - int npages, int type, int portal) +struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, + int npages, int type, int portal) { struct obd_import *imp = req->rq_import; struct ptlrpc_bulk_desc *desc; @@ -1176,15 +1176,22 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * sure that all rdma transfers finished and will * not corrupt any data. */ - if (ptlrpc_client_recv_or_unlink(req)) + if (ptlrpc_client_recv_or_unlink(req) || + ptlrpc_client_bulk_active(req)) continue; /* * Turn fail_loc off to prevent it from looping * forever. */ - OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_UNLINK, - OBD_FAIL_ONCE); + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) { + OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK, + OBD_FAIL_ONCE); + } + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) { + OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK, + OBD_FAIL_ONCE); + } /* * Move to next phase if reply was successfully @@ -1208,7 +1215,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) /* * Check if we still need to wait for unlink. */ - if (ptlrpc_client_recv_or_unlink(req)) + if (ptlrpc_client_recv_or_unlink(req) || + ptlrpc_client_bulk_active(req)) continue; } @@ -1277,11 +1285,13 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT); if (req->rq_bulk) { - __u64 old_xid = req->rq_xid; + __u64 old_xid; - ptlrpc_unregister_bulk(req); + if (!ptlrpc_unregister_bulk(req, 1)) + continue; /* ensure previous bulk fails */ + old_xid = req->rq_xid; req->rq_xid = ptlrpc_next_xid(); CDEBUG(D_HA, "resend bulk " "old x"LPU64 @@ -1367,7 +1377,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } LASSERT(req->rq_phase == RQ_PHASE_BULK); - if (ptlrpc_bulk_active(req->rq_bulk)) + if (ptlrpc_client_bulk_active(req)) continue; if (!req->rq_bulk->bd_success) { @@ -1389,8 +1399,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (!ptlrpc_unregister_reply(req, 1)) continue; - if (req->rq_bulk != NULL) - ptlrpc_unregister_bulk(req); + if (!ptlrpc_unregister_bulk(req, 1)) + continue; /* When calling interpret receiving already should be * finished. */ @@ -1462,13 +1472,11 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) spin_unlock(&req->rq_lock); ptlrpc_unregister_reply(req, async_unlink); + ptlrpc_unregister_bulk(req, async_unlink); if (obd_dump_on_timeout) libcfs_debug_dumplog(); - if (req->rq_bulk != NULL) - ptlrpc_unregister_bulk (req); - if (imp == NULL) { DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?"); RETURN(1); @@ -1816,7 +1824,7 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) /* * Let's setup deadline for reply unlink. */ - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && async && request->rq_reply_deadline == 0) request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK; @@ -1858,7 +1866,8 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) for (;;) { /* Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ - lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL); + lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), + cfs_time_seconds(1), NULL, NULL); rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), &lwi); if (rc == 0) { @@ -2143,7 +2152,7 @@ restart: lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT); if (req->rq_bulk != NULL) { - ptlrpc_unregister_bulk (req); + ptlrpc_unregister_bulk(req, 0); /* bulk requests are supposed to be * idempotent, so we are free to bump the xid @@ -2266,7 +2275,7 @@ after_send: * me. */ lwi = LWI_TIMEOUT(timeout, NULL, NULL); brc = l_wait_event(req->rq_reply_waitq, - !ptlrpc_bulk_active(req->rq_bulk), + !ptlrpc_client_bulk_active(req), &lwi); LASSERT(brc == 0 || brc == -ETIMEDOUT); if (brc != 0) { @@ -2279,7 +2288,7 @@ after_send: } } if (rc < 0) - ptlrpc_unregister_bulk (req); + ptlrpc_unregister_bulk(req, 0); } LASSERT(!req->rq_receiving_reply); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index fbc144d..22f3565 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -95,7 +95,7 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len, RETURN (0); } -int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc) +int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) { struct ptlrpc_connection *conn = desc->bd_export->exp_connection; int rc; @@ -162,16 +162,16 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc) RETURN(0); } -void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc) +/* Server side bulk abort. Idempotent. Not thread-safe (i.e. only + * serialises with completion callback) */ +void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc) { - /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only - * serialises with completion callback) */ - struct l_wait_info lwi; - int rc; + struct l_wait_info lwi; + int rc; - LASSERT (!in_interrupt ()); /* might sleep */ + LASSERT(!in_interrupt()); /* might sleep */ - if (!ptlrpc_bulk_active(desc)) /* completed or */ + if (!ptlrpc_server_bulk_active(desc)) /* completed or */ return; /* never started */ /* Do not send any meaningful data over the wire for evicted clients */ @@ -183,14 +183,15 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc) * but we must still l_wait_event() in this case, to give liblustre * a chance to run server_bulk_callback()*/ - LNetMDUnlink (desc->bd_md_h); + LNetMDUnlink(desc->bd_md_h); for (;;) { /* Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ - lwi = LWI_TIMEOUT (cfs_time_seconds(300), NULL, NULL); + lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), + cfs_time_seconds(1), NULL, NULL); rc = l_wait_event(desc->bd_waitq, - !ptlrpc_bulk_active(desc), &lwi); + !ptlrpc_server_bulk_active(desc), &lwi); if (rc == 0) return; @@ -199,7 +200,7 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc) } } -int ptlrpc_register_bulk (struct ptlrpc_request *req) +int ptlrpc_register_bulk(struct ptlrpc_request *req) { struct ptlrpc_bulk_desc *desc = req->rq_bulk; lnet_process_id_t peer; @@ -272,28 +273,44 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req) RETURN(0); } -void ptlrpc_unregister_bulk (struct ptlrpc_request *req) +/* Disconnect a bulk desc from the network. Idempotent. Not + * thread-safe (i.e. only interlocks with completion callback). */ +int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async) { - /* Disconnect a bulk desc from the network. Idempotent. Not - * thread-safe (i.e. only interlocks with completion callback). */ struct ptlrpc_bulk_desc *desc = req->rq_bulk; cfs_waitq_t *wq; struct l_wait_info lwi; int rc; + ENTRY; + + LASSERT(!in_interrupt()); /* might sleep */ - LASSERT (!in_interrupt ()); /* might sleep */ + /* Let's setup deadline for reply unlink. */ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) && + async && req->rq_bulk_deadline == 0) + req->rq_bulk_deadline = cfs_time_current_sec() + LONG_UNLINK; - if (!ptlrpc_bulk_active(desc)) /* completed or */ - return; /* never registered */ + if (!ptlrpc_client_bulk_active(req)) /* completed or */ + RETURN(1); /* never registered */ - LASSERT (desc->bd_req == req); /* bd_req NULL until registered */ + LASSERT(desc->bd_req == req); /* bd_req NULL until registered */ /* the unlink ensures the callback happens ASAP and is the last * one. If it fails, it must be because completion just happened, * but we must still l_wait_event() in this case to give liblustre * a chance to run client_bulk_callback() */ - LNetMDUnlink (desc->bd_md_h); + LNetMDUnlink(desc->bd_md_h); + + if (!ptlrpc_client_bulk_active(req)) /* completed or */ + RETURN(1); /* never registered */ + + /* Move to "Unregistering" phase as bulk was not unlinked yet. */ + ptlrpc_rqphase_move(req, RQ_PHASE_UNREGISTERING); + + /* Do not wait for unlink to finish. */ + if (async) + RETURN(0); if (req->rq_set != NULL) wq = &req->rq_set->set_waitq; @@ -303,15 +320,19 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req) for (;;) { /* Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ - lwi = LWI_TIMEOUT (cfs_time_seconds(300), NULL, NULL); - rc = l_wait_event(*wq, !ptlrpc_bulk_active(desc), &lwi); - if (rc == 0) - return; + lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), + cfs_time_seconds(1), NULL, NULL); + rc = l_wait_event(*wq, !ptlrpc_client_bulk_active(req), &lwi); + if (rc == 0) { + ptlrpc_rqphase_move(req, req->rq_next_phase); + RETURN(1); + } - LASSERT (rc == -ETIMEDOUT); - DEBUG_REQ(D_WARNING,req,"Unexpectedly long timeout: desc %p", + LASSERT(rc == -ETIMEDOUT); + DEBUG_REQ(D_WARNING, req, "Unexpectedly long timeout: desc %p", desc); } + RETURN(0); } static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags) @@ -356,7 +377,7 @@ static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags) } } -int ptlrpc_send_reply (struct ptlrpc_request *req, int flags) +int ptlrpc_send_reply(struct ptlrpc_request *req, int flags) { struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; struct ptlrpc_reply_state *rs = req->rq_reply_state; @@ -636,13 +657,13 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) LASSERT(!request->rq_receiving_reply); cleanup_bulk: - if (request->rq_bulk != NULL) - ptlrpc_unregister_bulk(request); - + /* We do sync unlink here as there was no real transfer here so + * the chance to have long unlink to sluggish net is smaller here. */ + ptlrpc_unregister_bulk(request, 0); return rc; } -int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd) +int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd) { struct ptlrpc_service *service = rqbd->rqbd_service; static lnet_process_id_t match_id = {LNET_NID_ANY, LNET_PID_ANY}; diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index fb0d8d0..2422ac5 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -1976,7 +1976,8 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) /* Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ - lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL); + lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), + cfs_time_seconds(1), NULL, NULL); rc = l_wait_event(service->srv_waitq, service->srv_nrqbd_receiving == 0, &lwi); diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index 5827f17..b20ff18 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -788,14 +788,14 @@ test_34() { #16129 done if [ $OPER == "timeout" ] ; then for j in `seq $OSTCOUNT`; do - #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510 - do_facet ost$j lctl set_param fail_loc=0x510 + #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x511 + do_facet ost$j lctl set_param fail_loc=0x511 done echo lock should expire else for j in `seq $OSTCOUNT`; do - #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511 - do_facet ost$j lctl set_param fail_loc=0x511 + #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x512 + do_facet ost$j lctl set_param fail_loc=0x512 done echo lock should not expire fi