From ec03e2002c26ec9e4dfc357cd78a2b0738cb4060 Mon Sep 17 00:00:00 2001 From: "hongchao.zhang" Date: Sun, 7 Mar 2010 14:52:55 +0800 Subject: [PATCH] b=21938 use req->rq_set itself during recovery during recovery, uses req->rq_set itself to replay the request instead of ptlrpcd_recovery_pc i=tappro@sun.com i=johann@sun.com --- lustre/include/lustre_lib.h | 5 +++++ lustre/include/lustre_net.h | 7 +++++-- lustre/ptlrpc/client.c | 45 ++++++++++++++++++++++++++++++++------------- lustre/ptlrpc/import.c | 5 ----- lustre/ptlrpc/pinger.c | 4 ++-- lustre/ptlrpc/ptlrpcd.c | 33 +++++++++++++++++++++++++++++---- 6 files changed, 73 insertions(+), 26 deletions(-) diff --git a/lustre/include/lustre_lib.h b/lustre/include/lustre_lib.h index d8efcf1..a7f92aee 100644 --- a/lustre/include/lustre_lib.h +++ b/lustre/include/lustre_lib.h @@ -595,6 +595,11 @@ static inline void obd_ioctl_freedata(char *buf, int len) * */ +static inline int back_to_sleep(void *arg) +{ + return 0; +} + #define LWI_ON_SIGNAL_NOOP ((void (*)(void *))(-1)) struct l_wait_info { diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 4d90174..2d6abab 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -194,7 +194,7 @@ struct ptlrpc_request_set; typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int); struct ptlrpc_request_set { - int set_remaining; /* # uncompleted requests */ + atomic_t set_remaining; /* # uncompleted requests */ cfs_waitq_t set_waitq; cfs_waitq_t *set_wakeup_ptr; struct list_head set_requests; @@ -326,7 +326,9 @@ struct ptlrpc_request { rq_reply_truncate:1, /* reply is truncated */ rq_fake:1, /* fake request - just for timeout only */ /* the request is queued to replay during recovery */ - rq_copy_queued:1; + rq_copy_queued:1, + /* whether the "rq_set" is a valid one */ + rq_invalid_rqset; enum rq_phase rq_phase; /* one of RQ_PHASE_* */ enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */ atomic_t rq_refcount; /* client-side refcount for SENT race, @@ -397,6 +399,7 @@ struct ptlrpc_request { /* Multi-rpc bits */ struct list_head rq_set_chain; + cfs_waitq_t rq_set_waitq; struct ptlrpc_request_set *rq_set; int (*rq_interpret_reply)(struct ptlrpc_request *req, void *data, int rc); /* async interpret handler */ diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index ef872ec..73f174a 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -594,6 +594,7 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode, CFS_INIT_LIST_HEAD(&request->rq_history_list); CFS_INIT_LIST_HEAD(&request->rq_exp_list); cfs_waitq_init(&request->rq_reply_waitq); + cfs_waitq_init(&request->rq_set_waitq); request->rq_xid = ptlrpc_next_xid(); atomic_set(&request->rq_refcount, 1); @@ -650,6 +651,7 @@ struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp, CFS_INIT_LIST_HEAD(&request->rq_history_list); CFS_INIT_LIST_HEAD(&request->rq_exp_list); cfs_waitq_init(&request->rq_reply_waitq); + cfs_waitq_init(&request->rq_set_waitq); request->rq_xid = ptlrpc_next_xid(); atomic_set(&request->rq_refcount, 1); @@ -664,7 +666,7 @@ void ptlrpc_fakereq_finished(struct ptlrpc_request *req) struct ptlrpc_request_set *set = req->rq_set; if (set) - set->set_remaining --; + atomic_dec(&set->set_remaining); } ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); @@ -682,7 +684,7 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void) RETURN(NULL); CFS_INIT_LIST_HEAD(&set->set_requests); cfs_waitq_init(&set->set_waitq); - set->set_remaining = 0; + atomic_set(&set->set_remaining, 0); spin_lock_init(&set->set_new_req_lock); CFS_INIT_LIST_HEAD(&set->set_new_requests); CFS_INIT_LIST_HEAD(&set->set_cblist); @@ -700,7 +702,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) ENTRY; /* Requests on the set should either all be completed, or all be new */ - expected_phase = (set->set_remaining == 0) ? + expected_phase = (atomic_read(&set->set_remaining) == 0) ? RQ_PHASE_COMPLETE : RQ_PHASE_NEW; list_for_each (tmp, &set->set_requests) { struct ptlrpc_request *req = @@ -710,8 +712,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) n++; } - LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n", - set->set_remaining, n); + LASSERTF(atomic_read(&set->set_remaining) == 0 || + atomic_read(&set->set_remaining) == n, "%d / %d\n", + atomic_read(&set->set_remaining), n); list_for_each_safe(tmp, next, &set->set_requests) { struct ptlrpc_request *req = @@ -733,14 +736,19 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) interpreter(req, &req->rq_async_args, req->rq_status); } - set->set_remaining--; + atomic_dec(&set->set_remaining); } + spin_lock(&req->rq_lock); req->rq_set = NULL; + req->rq_invalid_rqset = 0; + spin_unlock(&req->rq_lock); + + cfs_waitq_signal(&req->rq_set_waitq); ptlrpc_req_finished (req); } - LASSERT(set->set_remaining == 0); + LASSERT(atomic_read(&set->set_remaining) == 0); OBD_FREE(set, sizeof(*set)); EXIT; @@ -767,9 +775,10 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, struct ptlrpc_request *req) { /* The set takes over the caller's request reference */ + LASSERT(list_empty(&req->rq_set_chain)); list_add_tail(&req->rq_set_chain, &set->set_requests); req->rq_set = set; - set->set_remaining++; + atomic_inc(&set->set_remaining); } /** @@ -792,6 +801,7 @@ int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, /* * The set takes over the caller's request reference. */ + LASSERT(list_empty(&req->rq_set_chain)); list_add_tail(&req->rq_set_chain, &set->set_new_requests); req->rq_set = set; spin_unlock(&set->set_new_req_lock); @@ -1164,7 +1174,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) int force_timer_recalc = 0; ENTRY; - if (set->set_remaining == 0) + if (atomic_read(&set->set_remaining) == 0) RETURN(1); list_for_each(tmp, &set->set_requests) { @@ -1409,12 +1419,12 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) } spin_unlock(&imp->imp_lock); - set->set_remaining--; + atomic_dec(&set->set_remaining); cfs_waitq_broadcast(&imp->imp_recovery_waitq); } /* If we hit an error, we want to recover promptly. */ - RETURN(set->set_remaining == 0 || force_timer_recalc); + RETURN(atomic_read(&set->set_remaining) == 0 || force_timer_recalc); } /* Return 1 if we should give up, else 0 */ @@ -1626,9 +1636,18 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) * EINTR. * I don't really care if we go once more round the loop in * the error cases -eeb. */ - } while (rc != 0 || set->set_remaining != 0); + if (rc == 0 && atomic_read(&set->set_remaining) == 0) { + list_for_each(tmp, &set->set_requests) { + req = list_entry(tmp, struct ptlrpc_request, + rq_set_chain); + spin_lock(&req->rq_lock); + req->rq_invalid_rqset = 1; + spin_unlock(&req->rq_lock); + } + } + } while (rc != 0 || atomic_read(&set->set_remaining) != 0); - LASSERT(set->set_remaining == 0); + LASSERT(atomic_read(&set->set_remaining) == 0); rc = 0; list_for_each(tmp, &set->set_requests) { diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index 06e4c3c..f7424e4 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -1372,11 +1372,6 @@ int ptlrpc_import_recovery_state_machine(struct obd_import *imp) RETURN(rc); } -static int back_to_sleep(void *unused) -{ - return 0; -} - int ptlrpc_disconnect_import(struct obd_import *imp, int noclose) { struct ptlrpc_request *req; diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index 130621f..547c2f6 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -806,7 +806,7 @@ static int pinger_check_rpcs(void *arg) mutex_up(&pinger_sem); /* Might be empty, that's OK. */ - if (set->set_remaining == 0) + if (atomic_read(&set->set_remaining) == 0) CDEBUG(D_RPCTRACE, "nothing to ping\n"); list_for_each(iter, &set->set_requests) { @@ -855,7 +855,7 @@ do_check_set: atomic_dec(&imp->imp_inflight); } spin_unlock(&imp->imp_lock); - set->set_remaining--; + atomic_dec(&set->set_remaining); } mutex_up(&pinger_sem); diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index 0bcc528..f771b1e 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -85,9 +85,9 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) list_del_init(&req->rq_set_chain); req->rq_set = NULL; ptlrpcd_add_req(req); - set->set_remaining--; + atomic_dec(&set->set_remaining); } - LASSERT(set->set_remaining == 0); + LASSERT(atomic_read(&set->set_remaining) == 0); } EXPORT_SYMBOL(ptlrpcd_add_rqset); @@ -100,6 +100,31 @@ int ptlrpcd_add_req(struct ptlrpc_request *req) struct ptlrpcd_ctl *pc; int rc; + spin_lock(&req->rq_lock); + if (req->rq_invalid_rqset) { + cfs_duration_t timeout; + struct l_wait_info lwi; + + req->rq_invalid_rqset = 0; + spin_unlock(&req->rq_lock); + + timeout = cfs_time_seconds(5); + lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL); + l_wait_event(req->rq_reply_waitq, (req->rq_set == NULL), &lwi); + } else if (req->rq_set) { + LASSERT(req->rq_phase == RQ_PHASE_NEW); + LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY); + + /* ptlrpc_check_set will decrease the count */ + atomic_inc(&req->rq_set->set_remaining); + spin_unlock(&req->rq_lock); + + cfs_waitq_signal(&req->rq_set->set_waitq); + return 0; + } else { + spin_unlock(&req->rq_lock); + } + if (req->rq_send_state == LUSTRE_IMP_FULL) pc = &ptlrpcd_pc; else @@ -146,7 +171,7 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } spin_unlock(&pc->pc_set->set_new_req_lock); - if (pc->pc_set->set_remaining) { + if (atomic_read(&pc->pc_set->set_remaining)) { rc = rc | ptlrpc_check_set(pc->pc_set); /* @@ -273,7 +298,7 @@ int ptlrpcd_idle(void *arg) struct ptlrpcd_ctl *pc = arg; return (list_empty(&pc->pc_set->set_new_requests) && - pc->pc_set->set_remaining == 0); + atomic_read(&pc->pc_set->set_remaining) == 0); } #endif -- 1.8.3.1