Whamcloud - gitweb
LU-1373 ptlrpc: add flow control extension to ptlrpc req set
authorJohann Lombardi <johann@whamcloud.com>
Fri, 4 May 2012 15:53:44 +0000 (17:53 +0200)
committerOleg Drokin <green@whamcloud.com>
Mon, 11 Jun 2012 13:34:08 +0000 (09:34 -0400)
This patch allows new requests to be added to a request set while
this latter has already requests in flight. This is done by adding a
callback function invoked by the request set to generate RPCs.
The request set will fire a new RPC each time one completes, keeping
the number of RPCs in flight equals to set->set_max_inflight.

Lock callbacks can thus be sent by the service thread again. This
avoids doing disk I/O from the ptlrpcd context.

Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Change-Id: If95922fa8da1dfa7ce7c98d6bfe35e9a5f5bb34f
Reviewed-on: http://review.whamcloud.com/2650
Tested-by: Hudson
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_net.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ptlrpc/client.c

index 7b47587..4d8341d 100644 (file)
@@ -247,6 +247,7 @@ union ptlrpc_async_args {
 
 struct ptlrpc_request_set;
 typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
+typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *);
 
 /**
  * Definition of request set structure.
@@ -260,34 +261,44 @@ typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
  * returned.
  */
 struct ptlrpc_request_set {
-        cfs_atomic_t          set_refcount;
-        /** number of in queue requests */
-        cfs_atomic_t          set_new_count;
-        /** number of uncompleted requests */
-        cfs_atomic_t          set_remaining;
-        /** wait queue to wait on for request events */
-        cfs_waitq_t           set_waitq;
-        cfs_waitq_t          *set_wakeup_ptr;
-        /** List of requests in the set */
-        cfs_list_t            set_requests;
-        /**
-         * List of completion callbacks to be called when the set is completed
-         * This is only used if \a set_interpret is NULL.
-         * Links struct ptlrpc_set_cbdata.
-         */
-        cfs_list_t            set_cblist;
-        /** Completion callback, if only one. */
-        set_interpreter_func  set_interpret;
-        /** opaq argument passed to completion \a set_interpret callback. */
-        void                 *set_arg;
-        /**
-         * Lock for \a set_new_requests manipulations
-         * locked so that any old caller can communicate requests to
-         * the set holder who can then fold them into the lock-free set
-         */
-        cfs_spinlock_t        set_new_req_lock;
-        /** List of new yet unsent requests. Only used with ptlrpcd now. */
-        cfs_list_t            set_new_requests;
+       cfs_atomic_t          set_refcount;
+       /** number of in queue requests */
+       cfs_atomic_t          set_new_count;
+       /** number of uncompleted requests */
+       cfs_atomic_t          set_remaining;
+       /** wait queue to wait on for request events */
+       cfs_waitq_t           set_waitq;
+       cfs_waitq_t          *set_wakeup_ptr;
+       /** List of requests in the set */
+       cfs_list_t            set_requests;
+       /**
+        * List of completion callbacks to be called when the set is completed
+        * This is only used if \a set_interpret is NULL.
+        * Links struct ptlrpc_set_cbdata.
+        */
+       cfs_list_t            set_cblist;
+       /** Completion callback, if only one. */
+       set_interpreter_func  set_interpret;
+       /** opaq argument passed to completion \a set_interpret callback. */
+       void                 *set_arg;
+       /** rq_status of requests that have been freed already */
+       int                   set_rc;
+       /**
+        * Lock for \a set_new_requests manipulations
+        * locked so that any old caller can communicate requests to
+        * the set holder who can then fold them into the lock-free set
+        */
+       cfs_spinlock_t        set_new_req_lock;
+       /** List of new yet unsent requests. Only used with ptlrpcd now. */
+       cfs_list_t            set_new_requests;
+
+       /** Additional fields used by the flow control extension */
+       /** Maximum number of RPCs in flight */
+       int                   set_max_inflight;
+       /** Callback function used to generate RPCs */
+       set_producer_func     set_producer;
+       /** opaq argument passed to the producer callback */
+       void                 *set_producer_arg;
 };
 
 /**
@@ -1480,6 +1491,8 @@ void ptlrpc_cleanup_imp(struct obd_import *imp);
 void ptlrpc_abort_set(struct ptlrpc_request_set *set);
 
 struct ptlrpc_request_set *ptlrpc_prep_set(void);
+struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
+                                            void *arg);
 int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
                       set_interpreter_func fn, void *data);
 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
index 8f7cb5c..55d694b 100644 (file)
@@ -95,23 +95,12 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns);
 /* ldlm_lock.c */
 
 struct ldlm_cb_set_arg {
-        int          type;      /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
-        unsigned int threshold; /* threshold to wake up the waiting proc */
-        cfs_atomic_t rpcs;      /* # of inflight rpcs in set */
-        cfs_atomic_t restart;
-        cfs_atomic_t refcount;
-        cfs_waitq_t  waitq;
+       struct ptlrpc_request_set *set;
+       int                        type; /* LDLM_{CP,BL}_CALLBACK */
+       cfs_atomic_t               restart;
+       cfs_list_t                *list;
 };
 
-static inline void ldlm_csa_put(struct ldlm_cb_set_arg *arg)
-{
-        if (cfs_atomic_dec_and_test(&arg->refcount)) {
-                LASSERT(cfs_atomic_read(&arg->rpcs) == 0);
-
-                OBD_FREE_PTR(arg);
-        }
-}
-
 typedef enum {
         LDLM_WORK_BL_AST,
         LDLM_WORK_CP_AST,
index 2b4392f..34ac565 100644 (file)
@@ -1514,152 +1514,158 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
 #endif
 
 static int
-ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
+ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
-        struct ldlm_lock_desc d;
-        struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
-                                                l_bl_ast);
-        int rc;
-        ENTRY;
+       struct ldlm_cb_set_arg *arg = opaq;
+       struct ldlm_lock_desc   d;
+       int                     rc;
+       struct ldlm_lock       *lock;
+       ENTRY;
 
-        /* nobody should touch l_bl_ast */
-        lock_res_and_lock(lock);
-        cfs_list_del_init(&lock->l_bl_ast);
+       if (cfs_list_empty(arg->list))
+               RETURN(-ENOENT);
 
-        LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
-        LASSERT(lock->l_bl_ast_run == 0);
-        LASSERT(lock->l_blocking_lock);
-        lock->l_bl_ast_run++;
-        unlock_res_and_lock(lock);
+       lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
 
-        ldlm_lock2desc(lock->l_blocking_lock, &d);
+       /* nobody should touch l_bl_ast */
+       lock_res_and_lock(lock);
+       cfs_list_del_init(&lock->l_bl_ast);
 
-        rc = lock->l_blocking_ast(lock, &d, (void *)arg,
-                                  LDLM_CB_BLOCKING);
-        LDLM_LOCK_RELEASE(lock->l_blocking_lock);
-        lock->l_blocking_lock = NULL;
-        LDLM_LOCK_RELEASE(lock);
+       LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+       LASSERT(lock->l_bl_ast_run == 0);
+       LASSERT(lock->l_blocking_lock);
+       lock->l_bl_ast_run++;
+       unlock_res_and_lock(lock);
 
-        RETURN(rc);
-}
+       ldlm_lock2desc(lock->l_blocking_lock, &d);
 
-static int
-ldlm_work_cp_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
-{
-        struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, l_cp_ast);
-        ldlm_completion_callback completion_callback;
-        int rc = 0;
-        ENTRY;
+       rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
+       LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+       lock->l_blocking_lock = NULL;
+       LDLM_LOCK_RELEASE(lock);
 
-        /* It's possible to receive a completion AST before we've set
-         * the l_completion_ast pointer: either because the AST arrived
-         * before the reply, or simply because there's a small race
-         * window between receiving the reply and finishing the local
-         * enqueue. (bug 842)
-         *
-         * This can't happen with the blocking_ast, however, because we
-         * will never call the local blocking_ast until we drop our
-         * reader/writer reference, which we won't do until we get the
-         * reply and finish enqueueing. */
-
-        /* nobody should touch l_cp_ast */
-        lock_res_and_lock(lock);
-        cfs_list_del_init(&lock->l_cp_ast);
-        LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
-        /* save l_completion_ast since it can be changed by
-         * mds_intent_policy(), see bug 14225 */
-        completion_callback = lock->l_completion_ast;
-        lock->l_flags &= ~LDLM_FL_CP_REQD;
-        unlock_res_and_lock(lock);
-
-        if (completion_callback != NULL)
-                rc = completion_callback(lock, 0, (void *)arg);
-        LDLM_LOCK_RELEASE(lock);
+       RETURN(rc);
+}
 
-        RETURN(rc);
+static int
+ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
+{
+       struct ldlm_cb_set_arg  *arg = opaq;
+       int                      rc = 0;
+       struct ldlm_lock        *lock;
+       ldlm_completion_callback completion_callback;
+       ENTRY;
+
+       if (cfs_list_empty(arg->list))
+               RETURN(-ENOENT);
+
+       lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
+
+       /* It's possible to receive a completion AST before we've set
+        * the l_completion_ast pointer: either because the AST arrived
+        * before the reply, or simply because there's a small race
+        * window between receiving the reply and finishing the local
+        * enqueue. (bug 842)
+        *
+        * This can't happen with the blocking_ast, however, because we
+        * will never call the local blocking_ast until we drop our
+        * reader/writer reference, which we won't do until we get the
+        * reply and finish enqueueing. */
+
+       /* nobody should touch l_cp_ast */
+       lock_res_and_lock(lock);
+       cfs_list_del_init(&lock->l_cp_ast);
+       LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
+       /* save l_completion_ast since it can be changed by
+        * mds_intent_policy(), see bug 14225 */
+       completion_callback = lock->l_completion_ast;
+       lock->l_flags &= ~LDLM_FL_CP_REQD;
+       unlock_res_and_lock(lock);
+
+       if (completion_callback != NULL)
+               rc = completion_callback(lock, 0, (void *)arg);
+       LDLM_LOCK_RELEASE(lock);
+
+       RETURN(rc);
 }
 
 static int
-ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
+ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
-        struct ldlm_lock_desc desc;
-        struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
-                                                l_rk_ast);
-        int rc;
-        ENTRY;
+       struct ldlm_cb_set_arg *arg = opaq;
+       struct ldlm_lock_desc   desc;
+       int                     rc;
+       struct ldlm_lock       *lock;
+       ENTRY;
 
-        cfs_list_del_init(&lock->l_rk_ast);
+       if (cfs_list_empty(arg->list))
+               RETURN(-ENOENT);
 
-        /* the desc just pretend to exclusive */
-        ldlm_lock2desc(lock, &desc);
-        desc.l_req_mode = LCK_EX;
-        desc.l_granted_mode = 0;
+       lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
+       cfs_list_del_init(&lock->l_rk_ast);
 
-        rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
-        LDLM_LOCK_RELEASE(lock);
+       /* the desc just pretend to exclusive */
+       ldlm_lock2desc(lock, &desc);
+       desc.l_req_mode = LCK_EX;
+       desc.l_granted_mode = 0;
 
-        RETURN(rc);
+       rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
+       LDLM_LOCK_RELEASE(lock);
+
+       RETURN(rc);
 }
 
 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type)
 {
-        struct l_wait_info     lwi = { 0 };
-        struct ldlm_cb_set_arg *arg;
-        cfs_list_t *tmp, *pos;
-        int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
-        unsigned int max_ast_count;
-        int rc;
-        ENTRY;
-
-        if (cfs_list_empty(rpc_list))
-                RETURN(0);
-
-        OBD_ALLOC_PTR(arg);
-        if (arg == NULL)
-                RETURN(-ENOMEM);
-
-        cfs_atomic_set(&arg->restart, 0);
-        cfs_atomic_set(&arg->rpcs, 0);
-        cfs_atomic_set(&arg->refcount, 1);
-        cfs_waitq_init(&arg->waitq);
-
-        switch (ast_type) {
-        case LDLM_WORK_BL_AST:
-                arg->type = LDLM_BL_CALLBACK;
-                work_ast_lock = ldlm_work_bl_ast_lock;
-                break;
-        case LDLM_WORK_CP_AST:
-                arg->type = LDLM_CP_CALLBACK;
-                work_ast_lock = ldlm_work_cp_ast_lock;
-                break;
-        case LDLM_WORK_REVOKE_AST:
-                arg->type = LDLM_BL_CALLBACK;
-                work_ast_lock = ldlm_work_revoke_ast_lock;
-                break;
-        default:
-                LBUG();
-        }
-
-        max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
-        arg->threshold = max_ast_count;
-
-        cfs_list_for_each_safe(tmp, pos, rpc_list) {
-                (void)work_ast_lock(tmp, arg);
-                if (cfs_atomic_read(&arg->rpcs) < max_ast_count)
-                        continue;
-
-                l_wait_event(arg->waitq,
-                             cfs_atomic_read(&arg->rpcs) < arg->threshold,
-                             &lwi);
-        }
-
-        arg->threshold = 1;
-        l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi);
-
-        rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
-        ldlm_csa_put(arg);
-        RETURN(rc);
+       struct ldlm_cb_set_arg *arg;
+       set_producer_func       work_ast_lock;
+       int                     rc;
+
+       if (cfs_list_empty(rpc_list))
+               RETURN(0);
+
+       OBD_ALLOC_PTR(arg);
+       if (arg == NULL)
+               RETURN(-ENOMEM);
+
+       cfs_atomic_set(&arg->restart, 0);
+       arg->list = rpc_list;
+
+       switch (ast_type) {
+               case LDLM_WORK_BL_AST:
+                       arg->type = LDLM_BL_CALLBACK;
+                       work_ast_lock = ldlm_work_bl_ast_lock;
+                       break;
+               case LDLM_WORK_CP_AST:
+                       arg->type = LDLM_CP_CALLBACK;
+                       work_ast_lock = ldlm_work_cp_ast_lock;
+                       break;
+               case LDLM_WORK_REVOKE_AST:
+                       arg->type = LDLM_BL_CALLBACK;
+                       work_ast_lock = ldlm_work_revoke_ast_lock;
+                       break;
+               default:
+                       LBUG();
+       }
+
+       /* We create a ptlrpc request set with flow control extension.
+        * This request set will use the work_ast_lock function to produce new
+        * requests and will send a new request each time one completes in order
+        * to keep the number of requests in flight to ns_max_parallel_ast */
+       arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
+                                    work_ast_lock, arg);
+       if (arg->set == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       ptlrpc_set_wait(arg->set);
+       ptlrpc_set_destroy(arg->set);
+
+       rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
+       GOTO(out, rc);
+out:
+       OBD_FREE_PTR(arg);
+       return rc;
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
index 29a8683..2c65a42 100644 (file)
@@ -677,10 +677,6 @@ static int ldlm_cb_interpret(const struct lu_env *env,
         }
         LDLM_LOCK_RELEASE(lock);
 
-        if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
-                cfs_waitq_signal(&arg->waitq);
-
-        ldlm_csa_put(arg);
         RETURN(0);
 }
 
@@ -689,22 +685,20 @@ static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
                                           struct ldlm_lock *lock,
                                           int instant_cancel)
 {
-        int rc = 0;
-        ENTRY;
-
-        if (unlikely(instant_cancel)) {
-                rc = ptl_send_rpc(req, 1);
-                ptlrpc_req_finished(req);
-                if (rc == 0)
-                        cfs_atomic_inc(&arg->restart);
-        } else {
-                LDLM_LOCK_GET(lock);
-                cfs_atomic_inc(&arg->rpcs);
-                cfs_atomic_inc(&arg->refcount);
-                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
-        }
+       int rc = 0;
+       ENTRY;
+
+       if (unlikely(instant_cancel)) {
+               rc = ptl_send_rpc(req, 1);
+               ptlrpc_req_finished(req);
+               if (rc == 0)
+                       cfs_atomic_inc(&arg->restart);
+       } else {
+               LDLM_LOCK_GET(lock);
+               ptlrpc_set_add_req(arg->set, req);
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 /**
index 71e8e3a..261e50d 100644 (file)
@@ -52,6 +52,8 @@
 
 #include "ptlrpc_internal.h"
 
+static int ptlrpc_send_new_req(struct ptlrpc_request *req);
+
 /**
  * Initialize passed in client structure \a cl.
  */
@@ -869,25 +871,55 @@ void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
  */
 struct ptlrpc_request_set *ptlrpc_prep_set(void)
 {
-        struct ptlrpc_request_set *set;
+       struct ptlrpc_request_set *set;
 
-        ENTRY;
-        OBD_ALLOC(set, sizeof *set);
-        if (!set)
-                RETURN(NULL);
-        cfs_atomic_set(&set->set_refcount, 1);
-        CFS_INIT_LIST_HEAD(&set->set_requests);
-        cfs_waitq_init(&set->set_waitq);
-        cfs_atomic_set(&set->set_new_count, 0);
-        cfs_atomic_set(&set->set_remaining, 0);
-        cfs_spin_lock_init(&set->set_new_req_lock);
-        CFS_INIT_LIST_HEAD(&set->set_new_requests);
-        CFS_INIT_LIST_HEAD(&set->set_cblist);
+       ENTRY;
+       OBD_ALLOC(set, sizeof *set);
+       if (!set)
+               RETURN(NULL);
+       cfs_atomic_set(&set->set_refcount, 1);
+       CFS_INIT_LIST_HEAD(&set->set_requests);
+       cfs_waitq_init(&set->set_waitq);
+       cfs_atomic_set(&set->set_new_count, 0);
+       cfs_atomic_set(&set->set_remaining, 0);
+       cfs_spin_lock_init(&set->set_new_req_lock);
+       CFS_INIT_LIST_HEAD(&set->set_new_requests);
+       CFS_INIT_LIST_HEAD(&set->set_cblist);
+       set->set_max_inflight = UINT_MAX;
+       set->set_producer     = NULL;
+       set->set_producer_arg = NULL;
+       set->set_rc           = 0;
 
-        RETURN(set);
+       RETURN(set);
 }
 
 /**
+ * Allocate and initialize new request set structure with flow control
+ * extension. This extension allows to control the number of requests in-flight
+ * for the whole set. A callback function to generate requests must be provided
+ * and the request set will keep the number of requests sent over the wire to
+ * @max_inflight.
+ * Returns a pointer to the newly allocated set structure or NULL on error.
+ */
+struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
+                                            void *arg)
+
+{
+       struct ptlrpc_request_set *set;
+
+       set = ptlrpc_prep_set();
+       if (!set)
+               RETURN(NULL);
+
+       set->set_max_inflight  = max;
+       set->set_producer      = func;
+       set->set_producer_arg  = arg;
+
+       RETURN(set);
+}
+EXPORT_SYMBOL(ptlrpc_prep_fcset);
+
+/**
  * Wind down and free request set structure previously allocated with
  * ptlrpc_prep_set.
  * Ensures that all requests on the set have completed and removes
@@ -975,18 +1007,23 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
        char jobid[JOBSTATS_JOBID_SIZE];
-        LASSERT(cfs_list_empty(&req->rq_set_chain));
+       LASSERT(cfs_list_empty(&req->rq_set_chain));
 
-        /* The set takes over the caller's request reference */
-        cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
-        req->rq_set = set;
-        cfs_atomic_inc(&set->set_remaining);
-        req->rq_queued_time = cfs_time_current();
+       /* The set takes over the caller's request reference */
+       cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
+       req->rq_set = set;
+       cfs_atomic_inc(&set->set_remaining);
+       req->rq_queued_time = cfs_time_current();
 
        if (req->rq_reqmsg) {
                lustre_get_jobid(jobid);
                lustre_msg_set_jobid(req->rq_reqmsg, jobid);
        }
+
+       if (set->set_producer != NULL)
+               /* If the request set has a producer callback, the RPC must be
+                * sent straight away */
+               ptlrpc_send_new_req(req);
 }
 
 /**
@@ -1421,6 +1458,30 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         RETURN(0);
 }
 
+static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
+{
+       int remaining, rc;
+       ENTRY;
+
+       LASSERT(set->set_producer != NULL);
+
+       remaining = cfs_atomic_read(&set->set_remaining);
+
+       /* populate the ->set_requests list with requests until we
+        * reach the maximum number of RPCs in flight for this set */
+       while (cfs_atomic_read(&set->set_remaining) < set->set_max_inflight) {
+               rc = set->set_producer(set, set->set_producer_arg);
+               if (rc == -ENOENT) {
+                       /* no more RPC to produce */
+                       set->set_producer     = NULL;
+                       set->set_producer_arg = NULL;
+                       RETURN(0);
+               }
+       }
+
+       RETURN((cfs_atomic_read(&set->set_remaining) - remaining));
+}
+
 /**
  * this sends any unsent RPCs in \a set and returns 1 if all are sent
  * and no more replies are expected.
@@ -1429,14 +1490,14 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
  */
 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
-        cfs_list_t *tmp;
+        cfs_list_t *tmp, *next;
         int force_timer_recalc = 0;
         ENTRY;
 
         if (cfs_atomic_read(&set->set_remaining) == 0)
                 RETURN(1);
 
-        cfs_list_for_each(tmp, &set->set_requests) {
+        cfs_list_for_each_safe(tmp, next, &set->set_requests) {
                 struct ptlrpc_request *req =
                         cfs_list_entry(tmp, struct ptlrpc_request,
                                        rq_set_chain);
@@ -1759,6 +1820,25 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 
                 cfs_atomic_dec(&set->set_remaining);
                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
+
+               if (set->set_producer) {
+                       /* produce a new request if possible */
+                       if (ptlrpc_set_producer(set) > 0)
+                               force_timer_recalc = 1;
+
+                       /* free the request that has just been completed
+                        * in order not to pollute set->set_requests */
+                       cfs_list_del_init(&req->rq_set_chain);
+                       cfs_spin_lock(&req->rq_lock);
+                       req->rq_set = NULL;
+                       req->rq_invalid_rqset = 0;
+                       cfs_spin_unlock(&req->rq_lock);
+
+                       /* record rq_status to compute the final status later */
+                       if (req->rq_status != 0)
+                               set->set_rc = req->rq_status;
+                       ptlrpc_req_finished(req);
+               }
         }
 
         /* If we hit an error, we want to recover promptly. */
@@ -1988,15 +2068,19 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
         int                    rc, timeout;
         ENTRY;
 
+       if (set->set_producer)
+               (void)ptlrpc_set_producer(set);
+       else
+               cfs_list_for_each(tmp, &set->set_requests) {
+                       req = cfs_list_entry(tmp, struct ptlrpc_request,
+                                            rq_set_chain);
+                       if (req->rq_phase == RQ_PHASE_NEW)
+                               (void)ptlrpc_send_new_req(req);
+               }
+
         if (cfs_list_empty(&set->set_requests))
                 RETURN(0);
 
-        cfs_list_for_each(tmp, &set->set_requests) {
-                req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-                if (req->rq_phase == RQ_PHASE_NEW)
-                        (void)ptlrpc_send_new_req(req);
-        }
-
         do {
                 timeout = ptlrpc_set_next_timeout(set);
 
@@ -2065,7 +2149,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
 
         LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 
-        rc = 0;
+        rc = set->set_rc; /* rq_status of already freed requests if any */
         cfs_list_for_each(tmp, &set->set_requests) {
                 req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain);