From 020924966d4acd98e96ede094d72a49b762233db Mon Sep 17 00:00:00 2001 From: Johann Lombardi Date: Fri, 4 May 2012 17:53:44 +0200 Subject: [PATCH] LU-1373 ptlrpc: add flow control extension to ptlrpc req set This patch allows new requests to be added to a request set while this latter has already requests in flight. This is done by adding a callback function invoked by the request set to generate RPCs. The request set will fire a new RPC each time one completes, keeping the number of RPCs in flight equals to set->set_max_inflight. Lock callbacks can thus be sent by the service thread again. This avoids doing disk I/O from the ptlrpcd context. Signed-off-by: Johann Lombardi Change-Id: If95922fa8da1dfa7ce7c98d6bfe35e9a5f5bb34f Reviewed-on: http://review.whamcloud.com/2650 Tested-by: Hudson Reviewed-by: Fan Yong Reviewed-by: Jinshan Xiong Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre_net.h | 69 +++++++----- lustre/ldlm/ldlm_internal.h | 19 +--- lustre/ldlm/ldlm_lock.c | 254 +++++++++++++++++++++++--------------------- lustre/ldlm/ldlm_lockd.c | 32 +++--- lustre/ptlrpc/client.c | 142 ++++++++++++++++++++----- 5 files changed, 301 insertions(+), 215 deletions(-) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 7b47587..4d8341d 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -247,6 +247,7 @@ union ptlrpc_async_args { struct ptlrpc_request_set; typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int); +typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *); /** * Definition of request set structure. @@ -260,34 +261,44 @@ typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int); * returned. */ struct ptlrpc_request_set { - cfs_atomic_t set_refcount; - /** number of in queue requests */ - cfs_atomic_t set_new_count; - /** number of uncompleted requests */ - cfs_atomic_t set_remaining; - /** wait queue to wait on for request events */ - cfs_waitq_t set_waitq; - cfs_waitq_t *set_wakeup_ptr; - /** List of requests in the set */ - cfs_list_t set_requests; - /** - * List of completion callbacks to be called when the set is completed - * This is only used if \a set_interpret is NULL. - * Links struct ptlrpc_set_cbdata. - */ - cfs_list_t set_cblist; - /** Completion callback, if only one. */ - set_interpreter_func set_interpret; - /** opaq argument passed to completion \a set_interpret callback. */ - void *set_arg; - /** - * Lock for \a set_new_requests manipulations - * locked so that any old caller can communicate requests to - * the set holder who can then fold them into the lock-free set - */ - cfs_spinlock_t set_new_req_lock; - /** List of new yet unsent requests. Only used with ptlrpcd now. */ - cfs_list_t set_new_requests; + cfs_atomic_t set_refcount; + /** number of in queue requests */ + cfs_atomic_t set_new_count; + /** number of uncompleted requests */ + cfs_atomic_t set_remaining; + /** wait queue to wait on for request events */ + cfs_waitq_t set_waitq; + cfs_waitq_t *set_wakeup_ptr; + /** List of requests in the set */ + cfs_list_t set_requests; + /** + * List of completion callbacks to be called when the set is completed + * This is only used if \a set_interpret is NULL. + * Links struct ptlrpc_set_cbdata. + */ + cfs_list_t set_cblist; + /** Completion callback, if only one. */ + set_interpreter_func set_interpret; + /** opaq argument passed to completion \a set_interpret callback. */ + void *set_arg; + /** rq_status of requests that have been freed already */ + int set_rc; + /** + * Lock for \a set_new_requests manipulations + * locked so that any old caller can communicate requests to + * the set holder who can then fold them into the lock-free set + */ + cfs_spinlock_t set_new_req_lock; + /** List of new yet unsent requests. Only used with ptlrpcd now. */ + cfs_list_t set_new_requests; + + /** Additional fields used by the flow control extension */ + /** Maximum number of RPCs in flight */ + int set_max_inflight; + /** Callback function used to generate RPCs */ + set_producer_func set_producer; + /** opaq argument passed to the producer callback */ + void *set_producer_arg; }; /** @@ -1480,6 +1491,8 @@ void ptlrpc_cleanup_imp(struct obd_import *imp); void ptlrpc_abort_set(struct ptlrpc_request_set *set); struct ptlrpc_request_set *ptlrpc_prep_set(void); +struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, + void *arg); int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, set_interpreter_func fn, void *data); int ptlrpc_set_next_timeout(struct ptlrpc_request_set *); diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 8f7cb5c..55d694b4 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -95,23 +95,12 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns); /* ldlm_lock.c */ struct ldlm_cb_set_arg { - int type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */ - unsigned int threshold; /* threshold to wake up the waiting proc */ - cfs_atomic_t rpcs; /* # of inflight rpcs in set */ - cfs_atomic_t restart; - cfs_atomic_t refcount; - cfs_waitq_t waitq; + struct ptlrpc_request_set *set; + int type; /* LDLM_{CP,BL}_CALLBACK */ + cfs_atomic_t restart; + cfs_list_t *list; }; -static inline void ldlm_csa_put(struct ldlm_cb_set_arg *arg) -{ - if (cfs_atomic_dec_and_test(&arg->refcount)) { - LASSERT(cfs_atomic_read(&arg->rpcs) == 0); - - OBD_FREE_PTR(arg); - } -} - typedef enum { LDLM_WORK_BL_AST, LDLM_WORK_CP_AST, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 2b4392f..34ac565 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1514,152 +1514,158 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue, #endif static int -ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) +ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) { - struct ldlm_lock_desc d; - struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, - l_bl_ast); - int rc; - ENTRY; + struct ldlm_cb_set_arg *arg = opaq; + struct ldlm_lock_desc d; + int rc; + struct ldlm_lock *lock; + ENTRY; - /* nobody should touch l_bl_ast */ - lock_res_and_lock(lock); - cfs_list_del_init(&lock->l_bl_ast); + if (cfs_list_empty(arg->list)) + RETURN(-ENOENT); - LASSERT(lock->l_flags & LDLM_FL_AST_SENT); - LASSERT(lock->l_bl_ast_run == 0); - LASSERT(lock->l_blocking_lock); - lock->l_bl_ast_run++; - unlock_res_and_lock(lock); + lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast); - ldlm_lock2desc(lock->l_blocking_lock, &d); + /* nobody should touch l_bl_ast */ + lock_res_and_lock(lock); + cfs_list_del_init(&lock->l_bl_ast); - rc = lock->l_blocking_ast(lock, &d, (void *)arg, - LDLM_CB_BLOCKING); - LDLM_LOCK_RELEASE(lock->l_blocking_lock); - lock->l_blocking_lock = NULL; - LDLM_LOCK_RELEASE(lock); + LASSERT(lock->l_flags & LDLM_FL_AST_SENT); + LASSERT(lock->l_bl_ast_run == 0); + LASSERT(lock->l_blocking_lock); + lock->l_bl_ast_run++; + unlock_res_and_lock(lock); - RETURN(rc); -} + ldlm_lock2desc(lock->l_blocking_lock, &d); -static int -ldlm_work_cp_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) -{ - struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, l_cp_ast); - ldlm_completion_callback completion_callback; - int rc = 0; - ENTRY; + rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING); + LDLM_LOCK_RELEASE(lock->l_blocking_lock); + lock->l_blocking_lock = NULL; + LDLM_LOCK_RELEASE(lock); - /* It's possible to receive a completion AST before we've set - * the l_completion_ast pointer: either because the AST arrived - * before the reply, or simply because there's a small race - * window between receiving the reply and finishing the local - * enqueue. (bug 842) - * - * This can't happen with the blocking_ast, however, because we - * will never call the local blocking_ast until we drop our - * reader/writer reference, which we won't do until we get the - * reply and finish enqueueing. */ - - /* nobody should touch l_cp_ast */ - lock_res_and_lock(lock); - cfs_list_del_init(&lock->l_cp_ast); - LASSERT(lock->l_flags & LDLM_FL_CP_REQD); - /* save l_completion_ast since it can be changed by - * mds_intent_policy(), see bug 14225 */ - completion_callback = lock->l_completion_ast; - lock->l_flags &= ~LDLM_FL_CP_REQD; - unlock_res_and_lock(lock); - - if (completion_callback != NULL) - rc = completion_callback(lock, 0, (void *)arg); - LDLM_LOCK_RELEASE(lock); + RETURN(rc); +} - RETURN(rc); +static int +ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) +{ + struct ldlm_cb_set_arg *arg = opaq; + int rc = 0; + struct ldlm_lock *lock; + ldlm_completion_callback completion_callback; + ENTRY; + + if (cfs_list_empty(arg->list)) + RETURN(-ENOENT); + + lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast); + + /* It's possible to receive a completion AST before we've set + * the l_completion_ast pointer: either because the AST arrived + * before the reply, or simply because there's a small race + * window between receiving the reply and finishing the local + * enqueue. (bug 842) + * + * This can't happen with the blocking_ast, however, because we + * will never call the local blocking_ast until we drop our + * reader/writer reference, which we won't do until we get the + * reply and finish enqueueing. */ + + /* nobody should touch l_cp_ast */ + lock_res_and_lock(lock); + cfs_list_del_init(&lock->l_cp_ast); + LASSERT(lock->l_flags & LDLM_FL_CP_REQD); + /* save l_completion_ast since it can be changed by + * mds_intent_policy(), see bug 14225 */ + completion_callback = lock->l_completion_ast; + lock->l_flags &= ~LDLM_FL_CP_REQD; + unlock_res_and_lock(lock); + + if (completion_callback != NULL) + rc = completion_callback(lock, 0, (void *)arg); + LDLM_LOCK_RELEASE(lock); + + RETURN(rc); } static int -ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) +ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) { - struct ldlm_lock_desc desc; - struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, - l_rk_ast); - int rc; - ENTRY; + struct ldlm_cb_set_arg *arg = opaq; + struct ldlm_lock_desc desc; + int rc; + struct ldlm_lock *lock; + ENTRY; - cfs_list_del_init(&lock->l_rk_ast); + if (cfs_list_empty(arg->list)) + RETURN(-ENOENT); - /* the desc just pretend to exclusive */ - ldlm_lock2desc(lock, &desc); - desc.l_req_mode = LCK_EX; - desc.l_granted_mode = 0; + lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast); + cfs_list_del_init(&lock->l_rk_ast); - rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING); - LDLM_LOCK_RELEASE(lock); + /* the desc just pretend to exclusive */ + ldlm_lock2desc(lock, &desc); + desc.l_req_mode = LCK_EX; + desc.l_granted_mode = 0; - RETURN(rc); + rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING); + LDLM_LOCK_RELEASE(lock); + + RETURN(rc); } int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type) { - struct l_wait_info lwi = { 0 }; - struct ldlm_cb_set_arg *arg; - cfs_list_t *tmp, *pos; - int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg); - unsigned int max_ast_count; - int rc; - ENTRY; - - if (cfs_list_empty(rpc_list)) - RETURN(0); - - OBD_ALLOC_PTR(arg); - if (arg == NULL) - RETURN(-ENOMEM); - - cfs_atomic_set(&arg->restart, 0); - cfs_atomic_set(&arg->rpcs, 0); - cfs_atomic_set(&arg->refcount, 1); - cfs_waitq_init(&arg->waitq); - - switch (ast_type) { - case LDLM_WORK_BL_AST: - arg->type = LDLM_BL_CALLBACK; - work_ast_lock = ldlm_work_bl_ast_lock; - break; - case LDLM_WORK_CP_AST: - arg->type = LDLM_CP_CALLBACK; - work_ast_lock = ldlm_work_cp_ast_lock; - break; - case LDLM_WORK_REVOKE_AST: - arg->type = LDLM_BL_CALLBACK; - work_ast_lock = ldlm_work_revoke_ast_lock; - break; - default: - LBUG(); - } - - max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX; - arg->threshold = max_ast_count; - - cfs_list_for_each_safe(tmp, pos, rpc_list) { - (void)work_ast_lock(tmp, arg); - if (cfs_atomic_read(&arg->rpcs) < max_ast_count) - continue; - - l_wait_event(arg->waitq, - cfs_atomic_read(&arg->rpcs) < arg->threshold, - &lwi); - } - - arg->threshold = 1; - l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi); - - rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0; - ldlm_csa_put(arg); - RETURN(rc); + struct ldlm_cb_set_arg *arg; + set_producer_func work_ast_lock; + int rc; + + if (cfs_list_empty(rpc_list)) + RETURN(0); + + OBD_ALLOC_PTR(arg); + if (arg == NULL) + RETURN(-ENOMEM); + + cfs_atomic_set(&arg->restart, 0); + arg->list = rpc_list; + + switch (ast_type) { + case LDLM_WORK_BL_AST: + arg->type = LDLM_BL_CALLBACK; + work_ast_lock = ldlm_work_bl_ast_lock; + break; + case LDLM_WORK_CP_AST: + arg->type = LDLM_CP_CALLBACK; + work_ast_lock = ldlm_work_cp_ast_lock; + break; + case LDLM_WORK_REVOKE_AST: + arg->type = LDLM_BL_CALLBACK; + work_ast_lock = ldlm_work_revoke_ast_lock; + break; + default: + LBUG(); + } + + /* We create a ptlrpc request set with flow control extension. + * This request set will use the work_ast_lock function to produce new + * requests and will send a new request each time one completes in order + * to keep the number of requests in flight to ns_max_parallel_ast */ + arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX, + work_ast_lock, arg); + if (arg->set == NULL) + GOTO(out, rc = -ENOMEM); + + ptlrpc_set_wait(arg->set); + ptlrpc_set_destroy(arg->set); + + rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0; + GOTO(out, rc); +out: + OBD_FREE_PTR(arg); + return rc; } static int reprocess_one_queue(struct ldlm_resource *res, void *closure) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 29a8683..2c65a42 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -677,10 +677,6 @@ static int ldlm_cb_interpret(const struct lu_env *env, } LDLM_LOCK_RELEASE(lock); - if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold) - cfs_waitq_signal(&arg->waitq); - - ldlm_csa_put(arg); RETURN(0); } @@ -689,22 +685,20 @@ static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, struct ldlm_lock *lock, int instant_cancel) { - int rc = 0; - ENTRY; - - if (unlikely(instant_cancel)) { - rc = ptl_send_rpc(req, 1); - ptlrpc_req_finished(req); - if (rc == 0) - cfs_atomic_inc(&arg->restart); - } else { - LDLM_LOCK_GET(lock); - cfs_atomic_inc(&arg->rpcs); - cfs_atomic_inc(&arg->refcount); - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - } + int rc = 0; + ENTRY; + + if (unlikely(instant_cancel)) { + rc = ptl_send_rpc(req, 1); + ptlrpc_req_finished(req); + if (rc == 0) + cfs_atomic_inc(&arg->restart); + } else { + LDLM_LOCK_GET(lock); + ptlrpc_set_add_req(arg->set, req); + } - RETURN(rc); + RETURN(rc); } /** diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 71e8e3a..261e50d 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -52,6 +52,8 @@ #include "ptlrpc_internal.h" +static int ptlrpc_send_new_req(struct ptlrpc_request *req); + /** * Initialize passed in client structure \a cl. */ @@ -869,25 +871,55 @@ void ptlrpc_fakereq_finished(struct ptlrpc_request *req) */ struct ptlrpc_request_set *ptlrpc_prep_set(void) { - struct ptlrpc_request_set *set; + struct ptlrpc_request_set *set; - ENTRY; - OBD_ALLOC(set, sizeof *set); - if (!set) - RETURN(NULL); - cfs_atomic_set(&set->set_refcount, 1); - CFS_INIT_LIST_HEAD(&set->set_requests); - cfs_waitq_init(&set->set_waitq); - cfs_atomic_set(&set->set_new_count, 0); - cfs_atomic_set(&set->set_remaining, 0); - cfs_spin_lock_init(&set->set_new_req_lock); - CFS_INIT_LIST_HEAD(&set->set_new_requests); - CFS_INIT_LIST_HEAD(&set->set_cblist); + ENTRY; + OBD_ALLOC(set, sizeof *set); + if (!set) + RETURN(NULL); + cfs_atomic_set(&set->set_refcount, 1); + CFS_INIT_LIST_HEAD(&set->set_requests); + cfs_waitq_init(&set->set_waitq); + cfs_atomic_set(&set->set_new_count, 0); + cfs_atomic_set(&set->set_remaining, 0); + cfs_spin_lock_init(&set->set_new_req_lock); + CFS_INIT_LIST_HEAD(&set->set_new_requests); + CFS_INIT_LIST_HEAD(&set->set_cblist); + set->set_max_inflight = UINT_MAX; + set->set_producer = NULL; + set->set_producer_arg = NULL; + set->set_rc = 0; - RETURN(set); + RETURN(set); } /** + * Allocate and initialize new request set structure with flow control + * extension. This extension allows to control the number of requests in-flight + * for the whole set. A callback function to generate requests must be provided + * and the request set will keep the number of requests sent over the wire to + * @max_inflight. + * Returns a pointer to the newly allocated set structure or NULL on error. + */ +struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func, + void *arg) + +{ + struct ptlrpc_request_set *set; + + set = ptlrpc_prep_set(); + if (!set) + RETURN(NULL); + + set->set_max_inflight = max; + set->set_producer = func; + set->set_producer_arg = arg; + + RETURN(set); +} +EXPORT_SYMBOL(ptlrpc_prep_fcset); + +/** * Wind down and free request set structure previously allocated with * ptlrpc_prep_set. * Ensures that all requests on the set have completed and removes @@ -975,18 +1007,23 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, struct ptlrpc_request *req) { char jobid[JOBSTATS_JOBID_SIZE]; - LASSERT(cfs_list_empty(&req->rq_set_chain)); + LASSERT(cfs_list_empty(&req->rq_set_chain)); - /* The set takes over the caller's request reference */ - cfs_list_add_tail(&req->rq_set_chain, &set->set_requests); - req->rq_set = set; - cfs_atomic_inc(&set->set_remaining); - req->rq_queued_time = cfs_time_current(); + /* The set takes over the caller's request reference */ + cfs_list_add_tail(&req->rq_set_chain, &set->set_requests); + req->rq_set = set; + cfs_atomic_inc(&set->set_remaining); + req->rq_queued_time = cfs_time_current(); if (req->rq_reqmsg) { lustre_get_jobid(jobid); lustre_msg_set_jobid(req->rq_reqmsg, jobid); } + + if (set->set_producer != NULL) + /* If the request set has a producer callback, the RPC must be + * sent straight away */ + ptlrpc_send_new_req(req); } /** @@ -1421,6 +1458,30 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) RETURN(0); } +static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) +{ + int remaining, rc; + ENTRY; + + LASSERT(set->set_producer != NULL); + + remaining = cfs_atomic_read(&set->set_remaining); + + /* populate the ->set_requests list with requests until we + * reach the maximum number of RPCs in flight for this set */ + while (cfs_atomic_read(&set->set_remaining) < set->set_max_inflight) { + rc = set->set_producer(set, set->set_producer_arg); + if (rc == -ENOENT) { + /* no more RPC to produce */ + set->set_producer = NULL; + set->set_producer_arg = NULL; + RETURN(0); + } + } + + RETURN((cfs_atomic_read(&set->set_remaining) - remaining)); +} + /** * this sends any unsent RPCs in \a set and returns 1 if all are sent * and no more replies are expected. @@ -1429,14 +1490,14 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) */ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) { - cfs_list_t *tmp; + cfs_list_t *tmp, *next; int force_timer_recalc = 0; ENTRY; if (cfs_atomic_read(&set->set_remaining) == 0) RETURN(1); - cfs_list_for_each(tmp, &set->set_requests) { + cfs_list_for_each_safe(tmp, next, &set->set_requests) { struct ptlrpc_request *req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain); @@ -1759,6 +1820,25 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) cfs_atomic_dec(&set->set_remaining); cfs_waitq_broadcast(&imp->imp_recovery_waitq); + + if (set->set_producer) { + /* produce a new request if possible */ + if (ptlrpc_set_producer(set) > 0) + force_timer_recalc = 1; + + /* free the request that has just been completed + * in order not to pollute set->set_requests */ + cfs_list_del_init(&req->rq_set_chain); + cfs_spin_lock(&req->rq_lock); + req->rq_set = NULL; + req->rq_invalid_rqset = 0; + cfs_spin_unlock(&req->rq_lock); + + /* record rq_status to compute the final status later */ + if (req->rq_status != 0) + set->set_rc = req->rq_status; + ptlrpc_req_finished(req); + } } /* If we hit an error, we want to recover promptly. */ @@ -1988,15 +2068,19 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) int rc, timeout; ENTRY; + if (set->set_producer) + (void)ptlrpc_set_producer(set); + else + cfs_list_for_each(tmp, &set->set_requests) { + req = cfs_list_entry(tmp, struct ptlrpc_request, + rq_set_chain); + if (req->rq_phase == RQ_PHASE_NEW) + (void)ptlrpc_send_new_req(req); + } + if (cfs_list_empty(&set->set_requests)) RETURN(0); - cfs_list_for_each(tmp, &set->set_requests) { - req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain); - if (req->rq_phase == RQ_PHASE_NEW) - (void)ptlrpc_send_new_req(req); - } - do { timeout = ptlrpc_set_next_timeout(set); @@ -2065,7 +2149,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) LASSERT(cfs_atomic_read(&set->set_remaining) == 0); - rc = 0; + rc = set->set_rc; /* rq_status of already freed requests if any */ cfs_list_for_each(tmp, &set->set_requests) { req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain); -- 1.8.3.1