From 52779b4deda7c26d4d0ee5ef0c21a7349d3bfdec Mon Sep 17 00:00:00 2001 From: isaac Date: Thu, 15 Oct 2009 04:05:36 +0000 Subject: [PATCH] b=17103,i=nikita,i=green,i=isaac: - Rewrite ptlrpc_queue_wait() using sets (from rread). --- lustre/include/lustre_lib.h | 28 +++- lustre/ptlrpc/client.c | 331 ++++++-------------------------------------- lustre/tests/conf-sanity.sh | 4 +- 3 files changed, 67 insertions(+), 296 deletions(-) diff --git a/lustre/include/lustre_lib.h b/lustre/include/lustre_lib.h index d5547ce..e0e26fa 100644 --- a/lustre/include/lustre_lib.h +++ b/lustre/include/lustre_lib.h @@ -606,6 +606,7 @@ static inline void obd_ioctl_freedata(char *buf, int len) struct l_wait_info { cfs_duration_t lwi_timeout; cfs_duration_t lwi_interval; + int lwi_allow_intr; int (*lwi_on_timeout)(void *); void (*lwi_on_signal)(void *); void *lwi_cb_data; @@ -617,7 +618,8 @@ struct l_wait_info { .lwi_timeout = time, \ .lwi_on_timeout = cb, \ .lwi_cb_data = data, \ - .lwi_interval = 0 \ + .lwi_interval = 0, \ + .lwi_allow_intr = 0 \ }) #define LWI_TIMEOUT_INTERVAL(time, interval, cb, data) \ @@ -625,16 +627,28 @@ struct l_wait_info { .lwi_timeout = time, \ .lwi_on_timeout = cb, \ .lwi_cb_data = data, \ - .lwi_interval = interval \ + .lwi_interval = interval, \ + .lwi_allow_intr = 0 \ }) #define LWI_TIMEOUT_INTR(time, time_cb, sig_cb, data) \ ((struct l_wait_info) { \ .lwi_timeout = time, \ .lwi_on_timeout = time_cb, \ - .lwi_on_signal = sig_cb, \ + .lwi_on_signal = sig_cb, \ .lwi_cb_data = data, \ - .lwi_interval = 0 \ + .lwi_interval = 0, \ + .lwi_allow_intr = 0 \ +}) + +#define LWI_TIMEOUT_INTR_ALL(time, time_cb, sig_cb, data) \ +((struct l_wait_info) { \ + .lwi_timeout = time, \ + .lwi_on_timeout = time_cb, \ + .lwi_on_signal = sig_cb, \ + .lwi_cb_data = data, \ + .lwi_interval = 0, \ + .lwi_allow_intr = 1 \ }) #define LWI_INTR(cb, data) LWI_TIMEOUT_INTR(0, NULL, cb, data) @@ -650,6 +664,7 @@ do { \ cfs_waitlink_t __wait; \ cfs_duration_t __timeout = info->lwi_timeout; \ cfs_sigset_t __blocked; \ + int __allow_intr = info->lwi_allow_intr; \ \ ret = 0; \ if (condition) \ @@ -662,7 +677,7 @@ do { \ cfs_waitq_add(&wq, &__wait); \ \ /* Block all signals (just the non-fatal ones if no timeout). */ \ - if (info->lwi_on_signal != NULL && __timeout == 0) \ + if (info->lwi_on_signal != NULL && (__timeout == 0 || __allow_intr)) \ __blocked = l_w_e_set_sigs(LUSTRE_FATAL_SIGS); \ else \ __blocked = l_w_e_set_sigs(0); \ @@ -700,7 +715,8 @@ do { \ if (condition) \ break; \ if (cfs_signal_pending()) { \ - if (info->lwi_on_signal != NULL && __timeout == 0) { \ + if (info->lwi_on_signal != NULL && \ + (__timeout == 0 || __allow_intr)) { \ if (info->lwi_on_signal != LWI_ON_SIGNAL_NOOP) \ info->lwi_on_signal(info->lwi_cb_data);\ ret = -EINTR; \ diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 5b28d32..20b575e 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -915,44 +915,6 @@ static int ptlrpc_import_delay_req(struct obd_import *imp, RETURN(delay); } -static int ptlrpc_check_reply(struct ptlrpc_request *req) -{ - int rc = 0; - ENTRY; - - /* serialise with network callback */ - spin_lock(&req->rq_lock); - - if (ptlrpc_client_replied(req)) - GOTO(out, rc = 1); - - if (req->rq_net_err && !req->rq_timedout) { - spin_unlock(&req->rq_lock); - rc = ptlrpc_expire_one_request(req, 0); - spin_lock(&req->rq_lock); - GOTO(out, rc); - } - - if (req->rq_err) - GOTO(out, rc = 1); - - if (req->rq_resend) - GOTO(out, rc = 1); - - if (req->rq_restart) - GOTO(out, rc = 1); - - if (ptlrpc_client_early(req)) { - ptlrpc_at_recv_early_reply(req); - GOTO(out, rc = 0); /* keep waiting */ - } - - EXIT; - out: - spin_unlock(&req->rq_lock); - DEBUG_REQ(D_NET, req, "rc = %d for", rc); - return rc; -} /* Conditionally suppress specific console messages */ static int ptlrpc_console_allow(struct ptlrpc_request *req) @@ -978,7 +940,6 @@ static int ptlrpc_console_allow(struct ptlrpc_request *req) return 1; } - static int ptlrpc_check_status(struct ptlrpc_request *req) { int err; @@ -1342,11 +1303,15 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) GOTO(interpret, req->rq_status); } - /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr - * will only be set after rq_timedout, but the synchronous IO - * waiting path sets rq_intr irrespective of whether ptlrpcd - * has seen a timeout. our policy is to only interpret - * interrupted rpcs after they have timed out */ + /* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr + * so it sets rq_intr regardless of individual rpc + * timeouts. The synchronous IO waiting path sets + * rq_intr irrespective of whether ptlrpcd + * has seen a timeout. Our policy is to only interpret + * interrupted rpcs after they have timed out, so we + * need to enforce that here. + */ + if (req->rq_intr && (req->rq_timedout || req->rq_waiting || req->rq_wait_ctx)) { req->rq_status = -EINTR; @@ -1757,11 +1722,27 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) * req times out */ CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n", set, timeout); - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout ? timeout : 1), - ptlrpc_expired_set, - ptlrpc_interrupted_set, set); - rc = l_wait_event(set->set_waitq, - ptlrpc_check_set(NULL, set), &lwi); + + if (timeout == 0 && !cfs_signal_pending()) + /* + * No requests are in-flight (ether timed out + * or delayed), so we can allow interrupts. + * We still want to block for a limited time, + * so we allow interrupts during the timeout. + */ + lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1), + ptlrpc_expired_set, + ptlrpc_interrupted_set, set); + else + /* + * At least one request is in flight, so no + * interrupts are allowed. Wait until all + * complete, or an in-flight req times out. + */ + lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1), + ptlrpc_expired_set, set); + + rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi); LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); @@ -2098,35 +2079,6 @@ void ptlrpc_restart_req(struct ptlrpc_request *req) spin_unlock(&req->rq_lock); } -static int expired_request(void *data) -{ - struct ptlrpc_request *req = data; - ENTRY; - - /* - * Some failure can suspend regular timeouts. - */ - if (ptlrpc_check_suspend()) - RETURN(1); - - /* - * Deadline may have changed with an early reply. - */ - if (req->rq_deadline > cfs_time_current_sec()) - RETURN(1); - - RETURN(ptlrpc_expire_one_request(req, 0)); -} - -static void interrupted_request(void *data) -{ - struct ptlrpc_request *req = data; - DEBUG_REQ(D_HA, req, "request interrupted"); - spin_lock(&req->rq_lock); - req->rq_intr = 1; - spin_unlock(&req->rq_lock); -} - struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req) { ENTRY; @@ -2187,225 +2139,28 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, int ptlrpc_queue_wait(struct ptlrpc_request *req) { - int rc = 0; - int brc; - struct l_wait_info lwi; - struct obd_import *imp = req->rq_import; - cfs_duration_t timeout = CFS_TICK; - long timeoutl; + struct ptlrpc_request_set *set; + int rc; ENTRY; LASSERT(req->rq_set == NULL); LASSERT(!req->rq_receiving_reply); - /* for distributed debugging */ - lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid()); - LASSERT(imp->imp_obd != NULL); - CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc " - "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(), - imp->imp_obd->obd_uuid.uuid, - lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, - libcfs_nid2str(imp->imp_connection->c_peer.nid), - lustre_msg_get_opc(req->rq_reqmsg)); - - /* Mark phase here for a little debug help */ - ptlrpc_rqphase_move(req, RQ_PHASE_RPC); - - spin_lock(&imp->imp_lock); - req->rq_import_generation = imp->imp_generation; -restart: - if (ptlrpc_import_delay_req(imp, req, &rc)) { - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, &imp->imp_delayed_list); - atomic_inc(&imp->imp_inflight); - spin_unlock(&imp->imp_lock); - - DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)", - cfs_curproc_comm(), - ptlrpc_import_state_name(req->rq_send_state), - ptlrpc_import_state_name(imp->imp_state)); - lwi = LWI_INTR(interrupted_request, req); - rc = l_wait_event(req->rq_reply_waitq, - (req->rq_send_state == imp->imp_state || - req->rq_err || req->rq_intr), - &lwi); - DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)", - cfs_curproc_comm(), - ptlrpc_import_state_name(imp->imp_state), - ptlrpc_import_state_name(req->rq_send_state), - req->rq_err, req->rq_intr); - - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - atomic_dec(&imp->imp_inflight); - - if (req->rq_err) { - /* rq_status was set locally */ - rc = req->rq_status ? req->rq_status : -EIO; - } - else if (req->rq_intr) { - rc = -EINTR; - } - else if (req->rq_no_resend) { - rc = -ETIMEDOUT; - } else { - GOTO(restart, rc); - } - } - - if (rc != 0) { - spin_unlock(&imp->imp_lock); - req->rq_status = rc; // XXX this ok? - GOTO(out, rc); - } - - if (req->rq_resend) { - if (req->rq_bulk != NULL) { - ptlrpc_unregister_bulk(req, 0); - - /* bulk requests are supposed to be - * idempotent, so we are free to bump the xid - * here, which we need to do before - * registering the bulk again (bug 6371). - * print the old xid first for sanity. - */ - DEBUG_REQ(D_HA, req, "bumping xid for bulk: "); - req->rq_xid = ptlrpc_next_xid(); - } - - DEBUG_REQ(D_HA, req, "resending: "); - } - - /* XXX this is the same as ptlrpc_set_wait */ - LASSERT(list_empty(&req->rq_list)); - list_add_tail(&req->rq_list, &imp->imp_sending_list); - atomic_inc(&imp->imp_inflight); - spin_unlock(&imp->imp_lock); - - rc = sptlrpc_req_refresh_ctx(req, 0); - if (rc) { - if (req->rq_err) { - /* we got fatal ctx refresh error, directly jump out - * thus we can pass back the actual error code. - */ - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - atomic_dec(&imp->imp_inflight); - spin_unlock(&imp->imp_lock); - - CERROR("Failed to refresh ctx of req %p: %d\n", - req, rc); - GOTO(out, rc); - } - /* simulating we got error during send rpc */ - goto after_send; - } - - rc = ptl_send_rpc(req, 0); - if (rc) - DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc); -repeat: - timeoutl = req->rq_deadline - cfs_time_current_sec(); - timeout = (timeoutl <= 0 || rc) ? CFS_TICK : - cfs_time_seconds(timeoutl); - DEBUG_REQ(D_NET, req, - "-- sleeping for "CFS_DURATION_T" ticks", timeout); - lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request, - req); - brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi); - if (brc == -ETIMEDOUT && ((req->rq_deadline > cfs_time_current_sec()) || - ptlrpc_check_and_wait_suspend(req))) - goto repeat; - -after_send: - CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:opc " - "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(), - imp->imp_obd->obd_uuid.uuid, - lustre_msg_get_status(req->rq_reqmsg), req->rq_xid, - libcfs_nid2str(imp->imp_connection->c_peer.nid), - lustre_msg_get_opc(req->rq_reqmsg)); - - /* If the reply was received normally, this just grabs the spinlock - * (ensuring the reply callback has returned), sees that - * req->rq_receiving_reply is clear and returns. */ - ptlrpc_unregister_reply(req, 0); - - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - atomic_dec(&imp->imp_inflight); - spin_unlock(&imp->imp_lock); - - if (req->rq_err) { - DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d", - rc, req->rq_status); - rc = rc ? rc : req->rq_status; - GOTO(out, rc = rc ? rc : -EIO); - } - - if (req->rq_intr) { - /* Should only be interrupted if we timed out. */ - if (!req->rq_timedout) - DEBUG_REQ(D_ERROR, req, - "rq_intr set but rq_timedout not"); - GOTO(out, rc = -EINTR); - } - - /* Resend if we need to */ - if (req->rq_resend||req->rq_timedout) { - /* ...unless we were specifically told otherwise. */ - if (req->rq_no_resend) - GOTO(out, rc = -ETIMEDOUT); - spin_lock(&imp->imp_lock); - /* we can have rq_timeout on dlm fake import which not support - * recovery - but me need resend request on this import instead - * of return error */ - req->rq_resend = 1; - goto restart; - } - - if (!ptlrpc_client_replied(req)) { - /* How can this be? -eeb */ - DEBUG_REQ(D_ERROR, req, "!rq_replied: "); - LBUG(); - GOTO(out, rc = req->rq_status); + set = ptlrpc_prep_set(); + if (set == NULL) { + CERROR("Unable to allocate ptlrpc set."); + RETURN(-ENOMEM); } - rc = after_reply(req); - /* NB may return +ve success rc */ - if (req->rq_resend) { - spin_lock(&imp->imp_lock); - goto restart; - } + /* for distributed debugging */ + lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid()); - out: - if (req->rq_bulk != NULL) { - if (rc >= 0) { - /* success so far. Note that anything going wrong - * with bulk now, is EXTREMELY strange, since the - * server must have believed that the bulk - * transferred OK before she replied with success to - * me. */ - lwi = LWI_TIMEOUT(timeout, NULL, NULL); - brc = l_wait_event(req->rq_reply_waitq, - !ptlrpc_client_bulk_active(req), - &lwi); - LASSERT(brc == 0 || brc == -ETIMEDOUT); - if (brc != 0) { - LASSERT(brc == -ETIMEDOUT); - DEBUG_REQ(D_ERROR, req, "bulk timed out"); - rc = brc; - } else if (!req->rq_bulk->bd_success) { - DEBUG_REQ(D_ERROR, req, "bulk transfer failed"); - rc = -EIO; - } - } - if (rc < 0) - ptlrpc_unregister_bulk(req, 0); - } + /* add a ref for the set (see comment in ptlrpc_set_add_req) */ + ptlrpc_request_addref(req); + ptlrpc_set_add_req(set, req); + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); - LASSERT(!req->rq_receiving_reply); - ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - cfs_waitq_broadcast(&imp->imp_recovery_waitq); RETURN(rc); } diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index cb2f716..e300c28 100644 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -656,8 +656,8 @@ test_23a() { # was test_23 echo "waiting for mount to finish ... " WAIT=$(( WAIT + sleep)) done - [ "$WAIT" -eq "$MAX_WAIT" ] && error "MOUNT_PID $MOUNT_PID and \ - MOUNT__LUSTRE_PID $MOUNT__LUSTRE_PID still not killed in $WAIT secs" + [ "$WAIT" -eq "$MAX_WAIT" ] && error "MOUNT_PID $MOUNT_PID and "\ + "MOUNT_LUSTRE_PID $MOUNT_LUSTRE_PID still not killed in $WAIT secs" ps -ef | grep mount stop_mds || error stop_ost || error -- 1.8.3.1