From 4a51b7a884e536da4e6a3386e8d25a2bd2b445a2 Mon Sep 17 00:00:00 2001 From: "hongchao.zhang" Date: Wed, 21 Apr 2010 08:54:53 +0800 Subject: [PATCH] b=21938 use the same set during replay some requests use its own ptlrpc_request_set to process its requests, but Lustre will use a specific ptlrpc_request_set to process the requests during recovery. this patch fixes this problem to allow the requests to use its own set if it have one i=johann@sun.com i=tappro@sun.com --- lustre/include/lustre_lib.h | 4 ++++ lustre/include/lustre_net.h | 9 ++++++--- lustre/ptlrpc/client.c | 38 +++++++++++++++++++++++++------------- lustre/ptlrpc/import.c | 5 ----- lustre/ptlrpc/pinger.c | 4 ++-- lustre/ptlrpc/ptlrpcd.c | 33 +++++++++++++++++++++++++++++---- lustre/ptlrpc/sec.c | 1 + 7 files changed, 67 insertions(+), 27 deletions(-) diff --git a/lustre/include/lustre_lib.h b/lustre/include/lustre_lib.h index d15277c..5e2627b 100644 --- a/lustre/include/lustre_lib.h +++ b/lustre/include/lustre_lib.h @@ -607,6 +607,10 @@ static inline void obd_ioctl_freedata(char *buf, int len) * XXX nikita: some ptlrpc daemon threads have races of that sort. * */ +static inline int back_to_sleep(void *arg) +{ + return 0; +} #define LWI_ON_SIGNAL_NOOP ((void (*)(void *))(-1)) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 8c3f7d6..d507cb8 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -222,7 +222,7 @@ struct ptlrpc_request_set; typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int); struct ptlrpc_request_set { - int set_remaining; /* # uncompleted requests */ + cfs_atomic_t set_remaining; /* # uncompleted requests */ cfs_waitq_t set_waitq; cfs_waitq_t *set_wakeup_ptr; cfs_list_t set_requests; @@ -375,7 +375,9 @@ struct ptlrpc_request { rq_hp:1, /* high priority RPC */ rq_at_linked:1, /* link into service's srv_at_array */ rq_reply_truncate:1, - rq_committed:1; + rq_committed:1, + /* whether the "rq_set" is a valid one */ + rq_invalid_rqset:1; enum rq_phase rq_phase; /* one of RQ_PHASE_* */ enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */ @@ -492,7 +494,8 @@ struct ptlrpc_request { int rq_timeout; /* service time estimate (secs) */ /* Multi-rpc bits */ - cfs_list_t rq_set_chain; + cfs_list_t rq_set_chain; + cfs_waitq_t rq_set_waitq; struct ptlrpc_request_set *rq_set; /** Async completion handler */ ptlrpc_interpterer_t rq_interpret_reply; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index eb69133..b04fccb 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -533,6 +533,7 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request, CFS_INIT_LIST_HEAD(&request->rq_history_list); CFS_INIT_LIST_HEAD(&request->rq_exp_list); cfs_waitq_init(&request->rq_reply_waitq); + cfs_waitq_init(&request->rq_set_waitq); request->rq_xid = ptlrpc_next_xid(); cfs_atomic_set(&request->rq_refcount, 1); @@ -715,6 +716,7 @@ struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp, CFS_INIT_LIST_HEAD(&request->rq_history_list); CFS_INIT_LIST_HEAD(&request->rq_exp_list); cfs_waitq_init(&request->rq_reply_waitq); + cfs_waitq_init(&request->rq_set_waitq); request->rq_xid = ptlrpc_next_xid(); cfs_atomic_set(&request->rq_refcount, 1); @@ -729,7 +731,7 @@ void ptlrpc_fakereq_finished(struct ptlrpc_request *req) struct ptlrpc_request_set *set = req->rq_set; if (set) - set->set_remaining --; + cfs_atomic_dec(&set->set_remaining); } ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); @@ -747,7 +749,7 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void) RETURN(NULL); CFS_INIT_LIST_HEAD(&set->set_requests); cfs_waitq_init(&set->set_waitq); - set->set_remaining = 0; + cfs_atomic_set(&set->set_remaining, 0); cfs_spin_lock_init(&set->set_new_req_lock); CFS_INIT_LIST_HEAD(&set->set_new_requests); CFS_INIT_LIST_HEAD(&set->set_cblist); @@ -765,7 +767,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) ENTRY; /* Requests on the set should either all be completed, or all be new */ - expected_phase = (set->set_remaining == 0) ? + expected_phase = (cfs_atomic_read(&set->set_remaining) == 0) ? RQ_PHASE_COMPLETE : RQ_PHASE_NEW; cfs_list_for_each (tmp, &set->set_requests) { struct ptlrpc_request *req = @@ -776,8 +778,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) n++; } - LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n", - set->set_remaining, n); + LASSERTF(cfs_atomic_read(&set->set_remaining) == 0 || + cfs_atomic_read(&set->set_remaining) == n, "%d / %d\n", + cfs_atomic_read(&set->set_remaining), n); cfs_list_for_each_safe(tmp, next, &set->set_requests) { struct ptlrpc_request *req = @@ -789,14 +792,14 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) if (req->rq_phase == RQ_PHASE_NEW) { ptlrpc_req_interpret(NULL, req, -EBADR); - set->set_remaining--; + cfs_atomic_dec(&set->set_remaining); } req->rq_set = NULL; ptlrpc_req_finished (req); } - LASSERT(set->set_remaining == 0); + LASSERT(cfs_atomic_read(&set->set_remaining) == 0); OBD_FREE(set, sizeof(*set)); EXIT; @@ -824,7 +827,7 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, /* The set takes over the caller's request reference */ cfs_list_add_tail(&req->rq_set_chain, &set->set_requests); req->rq_set = set; - set->set_remaining++; + cfs_atomic_inc(&set->set_remaining); } /** @@ -1227,7 +1230,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) int force_timer_recalc = 0; ENTRY; - if (set->set_remaining == 0) + if (cfs_atomic_read(&set->set_remaining) == 0) RETURN(1); cfs_list_for_each(tmp, &set->set_requests) { @@ -1539,12 +1542,12 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } cfs_spin_unlock(&imp->imp_lock); - set->set_remaining--; + cfs_atomic_dec(&set->set_remaining); cfs_waitq_broadcast(&imp->imp_recovery_waitq); } /* If we hit an error, we want to recover promptly. */ - RETURN(set->set_remaining == 0 || force_timer_recalc); + RETURN(cfs_atomic_read(&set->set_remaining) == 0 || force_timer_recalc); } /* Return 1 if we should give up, else 0 */ @@ -1794,9 +1797,18 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) * EINTR. * I don't really care if we go once more round the loop in * the error cases -eeb. */ - } while (rc != 0 || set->set_remaining != 0); + if (rc == 0 && cfs_atomic_read(&set->set_remaining) == 0) { + cfs_list_for_each(tmp, &set->set_requests) { + req = cfs_list_entry(tmp, struct ptlrpc_request, + rq_set_chain); + cfs_spin_lock(&req->rq_lock); + req->rq_invalid_rqset = 1; + cfs_spin_unlock(&req->rq_lock); + } + } + } while (rc != 0 || cfs_atomic_read(&set->set_remaining) != 0); - LASSERT(set->set_remaining == 0); + LASSERT(cfs_atomic_read(&set->set_remaining) == 0); rc = 0; cfs_list_for_each(tmp, &set->set_requests) { diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index d2a9ad4..95eb543 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -1371,11 +1371,6 @@ out: RETURN(rc); } -static int back_to_sleep(void *unused) -{ - return 0; -} - int ptlrpc_disconnect_import(struct obd_import *imp, int noclose) { struct ptlrpc_request *req; diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index fdc172f..8584adb 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -809,7 +809,7 @@ static int pinger_check_rpcs(void *arg) cfs_mutex_up(&pinger_sem); /* Might be empty, that's OK. */ - if (set->set_remaining == 0) + if (cfs_atomic_read(&set->set_remaining) == 0) CDEBUG(D_RPCTRACE, "nothing to ping\n"); cfs_list_for_each(iter, &set->set_requests) { @@ -858,7 +858,7 @@ do_check_set: cfs_atomic_dec(&imp->imp_inflight); } cfs_spin_unlock(&imp->imp_lock); - set->set_remaining--; + cfs_atomic_dec(&set->set_remaining); } cfs_mutex_up(&pinger_sem); diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index e6bbf18..225db05 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -119,9 +119,9 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) cfs_list_del_init(&req->rq_set_chain); req->rq_set = NULL; ptlrpcd_add_req(req, PSCOPE_OTHER); - set->set_remaining--; + cfs_atomic_dec(&set->set_remaining); } - LASSERT(set->set_remaining == 0); + LASSERT(cfs_atomic_read(&set->set_remaining) == 0); } EXPORT_SYMBOL(ptlrpcd_add_rqset); @@ -136,6 +136,31 @@ int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope) int rc; LASSERT(scope < PSCOPE_NR); + + cfs_spin_lock(&req->rq_lock); + if (req->rq_invalid_rqset) { + cfs_duration_t timeout; + struct l_wait_info lwi; + + req->rq_invalid_rqset = 0; + cfs_spin_unlock(&req->rq_lock); + + timeout = cfs_time_seconds(5); + lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL); + l_wait_event(req->rq_reply_waitq, (req->rq_set == NULL), &lwi); + } else if (req->rq_set) { + LASSERT(req->rq_phase == RQ_PHASE_NEW); + LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY); + + /* ptlrpc_check_set will decrease the count */ + cfs_atomic_inc(&req->rq_set->set_remaining); + cfs_spin_unlock(&req->rq_lock); + + cfs_waitq_signal(&req->rq_set->set_waitq); + } else { + cfs_spin_unlock(&req->rq_lock); + } + pt = req->rq_send_state == LUSTRE_IMP_FULL ? PT_NORMAL : PT_RECOVERY; pc = &ptlrpcd_scopes[scope].pscope_thread[pt].pt_ctl; rc = ptlrpc_set_add_new_req(pc, req); @@ -184,7 +209,7 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc) } cfs_spin_unlock(&pc->pc_set->set_new_req_lock); - if (pc->pc_set->set_remaining) { + if (cfs_atomic_read(&pc->pc_set->set_remaining)) { rc = rc | ptlrpc_check_set(env, pc->pc_set); /* @@ -346,7 +371,7 @@ int ptlrpcd_idle(void *arg) struct ptlrpcd_ctl *pc = arg; return (cfs_list_empty(&pc->pc_set->set_new_requests) && - pc->pc_set->set_remaining == 0); + cfs_atomic_read(&pc->pc_set->set_remaining) == 0); } #endif diff --git a/lustre/ptlrpc/sec.c b/lustre/ptlrpc/sec.c index defe085..fce57c7 100644 --- a/lustre/ptlrpc/sec.c +++ b/lustre/ptlrpc/sec.c @@ -896,6 +896,7 @@ int sptlrpc_import_check_ctx(struct obd_import *imp) cfs_atomic_set(&req->rq_refcount, 10000); CFS_INIT_LIST_HEAD(&req->rq_ctx_chain); cfs_waitq_init(&req->rq_reply_waitq); + cfs_waitq_init(&req->rq_set_waitq); req->rq_import = imp; req->rq_flvr = sec->ps_flvr; req->rq_cli_ctx = ctx; -- 1.8.3.1