struct ptlrpc_request_set;
typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
+typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *);
/**
* Definition of request set structure.
* returned.
*/
struct ptlrpc_request_set {
- cfs_atomic_t set_refcount;
- /** number of in queue requests */
- cfs_atomic_t set_new_count;
- /** number of uncompleted requests */
- cfs_atomic_t set_remaining;
- /** wait queue to wait on for request events */
- cfs_waitq_t set_waitq;
- cfs_waitq_t *set_wakeup_ptr;
- /** List of requests in the set */
- cfs_list_t set_requests;
- /**
- * List of completion callbacks to be called when the set is completed
- * This is only used if \a set_interpret is NULL.
- * Links struct ptlrpc_set_cbdata.
- */
- cfs_list_t set_cblist;
- /** Completion callback, if only one. */
- set_interpreter_func set_interpret;
- /** opaq argument passed to completion \a set_interpret callback. */
- void *set_arg;
- /**
- * Lock for \a set_new_requests manipulations
- * locked so that any old caller can communicate requests to
- * the set holder who can then fold them into the lock-free set
- */
- cfs_spinlock_t set_new_req_lock;
- /** List of new yet unsent requests. Only used with ptlrpcd now. */
- cfs_list_t set_new_requests;
+ cfs_atomic_t set_refcount;
+ /** number of in queue requests */
+ cfs_atomic_t set_new_count;
+ /** number of uncompleted requests */
+ cfs_atomic_t set_remaining;
+ /** wait queue to wait on for request events */
+ cfs_waitq_t set_waitq;
+ cfs_waitq_t *set_wakeup_ptr;
+ /** List of requests in the set */
+ cfs_list_t set_requests;
+ /**
+ * List of completion callbacks to be called when the set is completed
+ * This is only used if \a set_interpret is NULL.
+ * Links struct ptlrpc_set_cbdata.
+ */
+ cfs_list_t set_cblist;
+ /** Completion callback, if only one. */
+ set_interpreter_func set_interpret;
+ /** opaq argument passed to completion \a set_interpret callback. */
+ void *set_arg;
+ /** rq_status of requests that have been freed already */
+ int set_rc;
+ /**
+ * Lock for \a set_new_requests manipulations
+ * locked so that any old caller can communicate requests to
+ * the set holder who can then fold them into the lock-free set
+ */
+ cfs_spinlock_t set_new_req_lock;
+ /** List of new yet unsent requests. Only used with ptlrpcd now. */
+ cfs_list_t set_new_requests;
+
+ /** Additional fields used by the flow control extension */
+ /** Maximum number of RPCs in flight */
+ int set_max_inflight;
+ /** Callback function used to generate RPCs */
+ set_producer_func set_producer;
+ /** opaq argument passed to the producer callback */
+ void *set_producer_arg;
};
/**
void ptlrpc_abort_set(struct ptlrpc_request_set *set);
struct ptlrpc_request_set *ptlrpc_prep_set(void);
+struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
+ void *arg);
int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
set_interpreter_func fn, void *data);
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
/* ldlm_lock.c */
struct ldlm_cb_set_arg {
- int type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
- unsigned int threshold; /* threshold to wake up the waiting proc */
- cfs_atomic_t rpcs; /* # of inflight rpcs in set */
- cfs_atomic_t restart;
- cfs_atomic_t refcount;
- cfs_waitq_t waitq;
+ struct ptlrpc_request_set *set;
+ int type; /* LDLM_{CP,BL}_CALLBACK */
+ cfs_atomic_t restart;
+ cfs_list_t *list;
};
-static inline void ldlm_csa_put(struct ldlm_cb_set_arg *arg)
-{
- if (cfs_atomic_dec_and_test(&arg->refcount)) {
- LASSERT(cfs_atomic_read(&arg->rpcs) == 0);
-
- OBD_FREE_PTR(arg);
- }
-}
-
typedef enum {
LDLM_WORK_BL_AST,
LDLM_WORK_CP_AST,
#endif
static int
-ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
+ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
{
- struct ldlm_lock_desc d;
- struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
- l_bl_ast);
- int rc;
- ENTRY;
+ struct ldlm_cb_set_arg *arg = opaq;
+ struct ldlm_lock_desc d;
+ int rc;
+ struct ldlm_lock *lock;
+ ENTRY;
- /* nobody should touch l_bl_ast */
- lock_res_and_lock(lock);
- cfs_list_del_init(&lock->l_bl_ast);
+ if (cfs_list_empty(arg->list))
+ RETURN(-ENOENT);
- LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
- LASSERT(lock->l_bl_ast_run == 0);
- LASSERT(lock->l_blocking_lock);
- lock->l_bl_ast_run++;
- unlock_res_and_lock(lock);
+ lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
- ldlm_lock2desc(lock->l_blocking_lock, &d);
+ /* nobody should touch l_bl_ast */
+ lock_res_and_lock(lock);
+ cfs_list_del_init(&lock->l_bl_ast);
- rc = lock->l_blocking_ast(lock, &d, (void *)arg,
- LDLM_CB_BLOCKING);
- LDLM_LOCK_RELEASE(lock->l_blocking_lock);
- lock->l_blocking_lock = NULL;
- LDLM_LOCK_RELEASE(lock);
+ LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+ LASSERT(lock->l_bl_ast_run == 0);
+ LASSERT(lock->l_blocking_lock);
+ lock->l_bl_ast_run++;
+ unlock_res_and_lock(lock);
- RETURN(rc);
-}
+ ldlm_lock2desc(lock->l_blocking_lock, &d);
-static int
-ldlm_work_cp_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
-{
- struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, l_cp_ast);
- ldlm_completion_callback completion_callback;
- int rc = 0;
- ENTRY;
+ rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
+ LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
+ LDLM_LOCK_RELEASE(lock);
- /* It's possible to receive a completion AST before we've set
- * the l_completion_ast pointer: either because the AST arrived
- * before the reply, or simply because there's a small race
- * window between receiving the reply and finishing the local
- * enqueue. (bug 842)
- *
- * This can't happen with the blocking_ast, however, because we
- * will never call the local blocking_ast until we drop our
- * reader/writer reference, which we won't do until we get the
- * reply and finish enqueueing. */
-
- /* nobody should touch l_cp_ast */
- lock_res_and_lock(lock);
- cfs_list_del_init(&lock->l_cp_ast);
- LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
- /* save l_completion_ast since it can be changed by
- * mds_intent_policy(), see bug 14225 */
- completion_callback = lock->l_completion_ast;
- lock->l_flags &= ~LDLM_FL_CP_REQD;
- unlock_res_and_lock(lock);
-
- if (completion_callback != NULL)
- rc = completion_callback(lock, 0, (void *)arg);
- LDLM_LOCK_RELEASE(lock);
+ RETURN(rc);
+}
- RETURN(rc);
+static int
+ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
+{
+ struct ldlm_cb_set_arg *arg = opaq;
+ int rc = 0;
+ struct ldlm_lock *lock;
+ ldlm_completion_callback completion_callback;
+ ENTRY;
+
+ if (cfs_list_empty(arg->list))
+ RETURN(-ENOENT);
+
+ lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
+
+ /* It's possible to receive a completion AST before we've set
+ * the l_completion_ast pointer: either because the AST arrived
+ * before the reply, or simply because there's a small race
+ * window between receiving the reply and finishing the local
+ * enqueue. (bug 842)
+ *
+ * This can't happen with the blocking_ast, however, because we
+ * will never call the local blocking_ast until we drop our
+ * reader/writer reference, which we won't do until we get the
+ * reply and finish enqueueing. */
+
+ /* nobody should touch l_cp_ast */
+ lock_res_and_lock(lock);
+ cfs_list_del_init(&lock->l_cp_ast);
+ LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
+ /* save l_completion_ast since it can be changed by
+ * mds_intent_policy(), see bug 14225 */
+ completion_callback = lock->l_completion_ast;
+ lock->l_flags &= ~LDLM_FL_CP_REQD;
+ unlock_res_and_lock(lock);
+
+ if (completion_callback != NULL)
+ rc = completion_callback(lock, 0, (void *)arg);
+ LDLM_LOCK_RELEASE(lock);
+
+ RETURN(rc);
}
static int
-ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
+ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
{
- struct ldlm_lock_desc desc;
- struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
- l_rk_ast);
- int rc;
- ENTRY;
+ struct ldlm_cb_set_arg *arg = opaq;
+ struct ldlm_lock_desc desc;
+ int rc;
+ struct ldlm_lock *lock;
+ ENTRY;
- cfs_list_del_init(&lock->l_rk_ast);
+ if (cfs_list_empty(arg->list))
+ RETURN(-ENOENT);
- /* the desc just pretend to exclusive */
- ldlm_lock2desc(lock, &desc);
- desc.l_req_mode = LCK_EX;
- desc.l_granted_mode = 0;
+ lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
+ cfs_list_del_init(&lock->l_rk_ast);
- rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
- LDLM_LOCK_RELEASE(lock);
+ /* the desc just pretend to exclusive */
+ ldlm_lock2desc(lock, &desc);
+ desc.l_req_mode = LCK_EX;
+ desc.l_granted_mode = 0;
- RETURN(rc);
+ rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
+ LDLM_LOCK_RELEASE(lock);
+
+ RETURN(rc);
}
int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
ldlm_desc_ast_t ast_type)
{
- struct l_wait_info lwi = { 0 };
- struct ldlm_cb_set_arg *arg;
- cfs_list_t *tmp, *pos;
- int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
- unsigned int max_ast_count;
- int rc;
- ENTRY;
-
- if (cfs_list_empty(rpc_list))
- RETURN(0);
-
- OBD_ALLOC_PTR(arg);
- if (arg == NULL)
- RETURN(-ENOMEM);
-
- cfs_atomic_set(&arg->restart, 0);
- cfs_atomic_set(&arg->rpcs, 0);
- cfs_atomic_set(&arg->refcount, 1);
- cfs_waitq_init(&arg->waitq);
-
- switch (ast_type) {
- case LDLM_WORK_BL_AST:
- arg->type = LDLM_BL_CALLBACK;
- work_ast_lock = ldlm_work_bl_ast_lock;
- break;
- case LDLM_WORK_CP_AST:
- arg->type = LDLM_CP_CALLBACK;
- work_ast_lock = ldlm_work_cp_ast_lock;
- break;
- case LDLM_WORK_REVOKE_AST:
- arg->type = LDLM_BL_CALLBACK;
- work_ast_lock = ldlm_work_revoke_ast_lock;
- break;
- default:
- LBUG();
- }
-
- max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
- arg->threshold = max_ast_count;
-
- cfs_list_for_each_safe(tmp, pos, rpc_list) {
- (void)work_ast_lock(tmp, arg);
- if (cfs_atomic_read(&arg->rpcs) < max_ast_count)
- continue;
-
- l_wait_event(arg->waitq,
- cfs_atomic_read(&arg->rpcs) < arg->threshold,
- &lwi);
- }
-
- arg->threshold = 1;
- l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi);
-
- rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
- ldlm_csa_put(arg);
- RETURN(rc);
+ struct ldlm_cb_set_arg *arg;
+ set_producer_func work_ast_lock;
+ int rc;
+
+ if (cfs_list_empty(rpc_list))
+ RETURN(0);
+
+ OBD_ALLOC_PTR(arg);
+ if (arg == NULL)
+ RETURN(-ENOMEM);
+
+ cfs_atomic_set(&arg->restart, 0);
+ arg->list = rpc_list;
+
+ switch (ast_type) {
+ case LDLM_WORK_BL_AST:
+ arg->type = LDLM_BL_CALLBACK;
+ work_ast_lock = ldlm_work_bl_ast_lock;
+ break;
+ case LDLM_WORK_CP_AST:
+ arg->type = LDLM_CP_CALLBACK;
+ work_ast_lock = ldlm_work_cp_ast_lock;
+ break;
+ case LDLM_WORK_REVOKE_AST:
+ arg->type = LDLM_BL_CALLBACK;
+ work_ast_lock = ldlm_work_revoke_ast_lock;
+ break;
+ default:
+ LBUG();
+ }
+
+ /* We create a ptlrpc request set with flow control extension.
+ * This request set will use the work_ast_lock function to produce new
+ * requests and will send a new request each time one completes in order
+ * to keep the number of requests in flight to ns_max_parallel_ast */
+ arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
+ work_ast_lock, arg);
+ if (arg->set == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ ptlrpc_set_wait(arg->set);
+ ptlrpc_set_destroy(arg->set);
+
+ rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
+ GOTO(out, rc);
+out:
+ OBD_FREE_PTR(arg);
+ return rc;
}
static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
}
LDLM_LOCK_RELEASE(lock);
- if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
- cfs_waitq_signal(&arg->waitq);
-
- ldlm_csa_put(arg);
RETURN(0);
}
struct ldlm_lock *lock,
int instant_cancel)
{
- int rc = 0;
- ENTRY;
-
- if (unlikely(instant_cancel)) {
- rc = ptl_send_rpc(req, 1);
- ptlrpc_req_finished(req);
- if (rc == 0)
- cfs_atomic_inc(&arg->restart);
- } else {
- LDLM_LOCK_GET(lock);
- cfs_atomic_inc(&arg->rpcs);
- cfs_atomic_inc(&arg->refcount);
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
- }
+ int rc = 0;
+ ENTRY;
+
+ if (unlikely(instant_cancel)) {
+ rc = ptl_send_rpc(req, 1);
+ ptlrpc_req_finished(req);
+ if (rc == 0)
+ cfs_atomic_inc(&arg->restart);
+ } else {
+ LDLM_LOCK_GET(lock);
+ ptlrpc_set_add_req(arg->set, req);
+ }
- RETURN(rc);
+ RETURN(rc);
}
/**
#include "ptlrpc_internal.h"
+static int ptlrpc_send_new_req(struct ptlrpc_request *req);
+
/**
* Initialize passed in client structure \a cl.
*/
*/
struct ptlrpc_request_set *ptlrpc_prep_set(void)
{
- struct ptlrpc_request_set *set;
+ struct ptlrpc_request_set *set;
- ENTRY;
- OBD_ALLOC(set, sizeof *set);
- if (!set)
- RETURN(NULL);
- cfs_atomic_set(&set->set_refcount, 1);
- CFS_INIT_LIST_HEAD(&set->set_requests);
- cfs_waitq_init(&set->set_waitq);
- cfs_atomic_set(&set->set_new_count, 0);
- cfs_atomic_set(&set->set_remaining, 0);
- cfs_spin_lock_init(&set->set_new_req_lock);
- CFS_INIT_LIST_HEAD(&set->set_new_requests);
- CFS_INIT_LIST_HEAD(&set->set_cblist);
+ ENTRY;
+ OBD_ALLOC(set, sizeof *set);
+ if (!set)
+ RETURN(NULL);
+ cfs_atomic_set(&set->set_refcount, 1);
+ CFS_INIT_LIST_HEAD(&set->set_requests);
+ cfs_waitq_init(&set->set_waitq);
+ cfs_atomic_set(&set->set_new_count, 0);
+ cfs_atomic_set(&set->set_remaining, 0);
+ cfs_spin_lock_init(&set->set_new_req_lock);
+ CFS_INIT_LIST_HEAD(&set->set_new_requests);
+ CFS_INIT_LIST_HEAD(&set->set_cblist);
+ set->set_max_inflight = UINT_MAX;
+ set->set_producer = NULL;
+ set->set_producer_arg = NULL;
+ set->set_rc = 0;
- RETURN(set);
+ RETURN(set);
}
/**
+ * Allocate and initialize new request set structure with flow control
+ * extension. This extension allows to control the number of requests in-flight
+ * for the whole set. A callback function to generate requests must be provided
+ * and the request set will keep the number of requests sent over the wire to
+ * @max_inflight.
+ * Returns a pointer to the newly allocated set structure or NULL on error.
+ */
+struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
+ void *arg)
+
+{
+ struct ptlrpc_request_set *set;
+
+ set = ptlrpc_prep_set();
+ if (!set)
+ RETURN(NULL);
+
+ set->set_max_inflight = max;
+ set->set_producer = func;
+ set->set_producer_arg = arg;
+
+ RETURN(set);
+}
+EXPORT_SYMBOL(ptlrpc_prep_fcset);
+
+/**
* Wind down and free request set structure previously allocated with
* ptlrpc_prep_set.
* Ensures that all requests on the set have completed and removes
struct ptlrpc_request *req)
{
char jobid[JOBSTATS_JOBID_SIZE];
- LASSERT(cfs_list_empty(&req->rq_set_chain));
+ LASSERT(cfs_list_empty(&req->rq_set_chain));
- /* The set takes over the caller's request reference */
- cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
- req->rq_set = set;
- cfs_atomic_inc(&set->set_remaining);
- req->rq_queued_time = cfs_time_current();
+ /* The set takes over the caller's request reference */
+ cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
+ req->rq_set = set;
+ cfs_atomic_inc(&set->set_remaining);
+ req->rq_queued_time = cfs_time_current();
if (req->rq_reqmsg) {
lustre_get_jobid(jobid);
lustre_msg_set_jobid(req->rq_reqmsg, jobid);
}
+
+ if (set->set_producer != NULL)
+ /* If the request set has a producer callback, the RPC must be
+ * sent straight away */
+ ptlrpc_send_new_req(req);
}
/**
RETURN(0);
}
+static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
+{
+ int remaining, rc;
+ ENTRY;
+
+ LASSERT(set->set_producer != NULL);
+
+ remaining = cfs_atomic_read(&set->set_remaining);
+
+ /* populate the ->set_requests list with requests until we
+ * reach the maximum number of RPCs in flight for this set */
+ while (cfs_atomic_read(&set->set_remaining) < set->set_max_inflight) {
+ rc = set->set_producer(set, set->set_producer_arg);
+ if (rc == -ENOENT) {
+ /* no more RPC to produce */
+ set->set_producer = NULL;
+ set->set_producer_arg = NULL;
+ RETURN(0);
+ }
+ }
+
+ RETURN((cfs_atomic_read(&set->set_remaining) - remaining));
+}
+
/**
* this sends any unsent RPCs in \a set and returns 1 if all are sent
* and no more replies are expected.
*/
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
{
- cfs_list_t *tmp;
+ cfs_list_t *tmp, *next;
int force_timer_recalc = 0;
ENTRY;
if (cfs_atomic_read(&set->set_remaining) == 0)
RETURN(1);
- cfs_list_for_each(tmp, &set->set_requests) {
+ cfs_list_for_each_safe(tmp, next, &set->set_requests) {
struct ptlrpc_request *req =
cfs_list_entry(tmp, struct ptlrpc_request,
rq_set_chain);
cfs_atomic_dec(&set->set_remaining);
cfs_waitq_broadcast(&imp->imp_recovery_waitq);
+
+ if (set->set_producer) {
+ /* produce a new request if possible */
+ if (ptlrpc_set_producer(set) > 0)
+ force_timer_recalc = 1;
+
+ /* free the request that has just been completed
+ * in order not to pollute set->set_requests */
+ cfs_list_del_init(&req->rq_set_chain);
+ cfs_spin_lock(&req->rq_lock);
+ req->rq_set = NULL;
+ req->rq_invalid_rqset = 0;
+ cfs_spin_unlock(&req->rq_lock);
+
+ /* record rq_status to compute the final status later */
+ if (req->rq_status != 0)
+ set->set_rc = req->rq_status;
+ ptlrpc_req_finished(req);
+ }
}
/* If we hit an error, we want to recover promptly. */
int rc, timeout;
ENTRY;
+ if (set->set_producer)
+ (void)ptlrpc_set_producer(set);
+ else
+ cfs_list_for_each(tmp, &set->set_requests) {
+ req = cfs_list_entry(tmp, struct ptlrpc_request,
+ rq_set_chain);
+ if (req->rq_phase == RQ_PHASE_NEW)
+ (void)ptlrpc_send_new_req(req);
+ }
+
if (cfs_list_empty(&set->set_requests))
RETURN(0);
- cfs_list_for_each(tmp, &set->set_requests) {
- req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- if (req->rq_phase == RQ_PHASE_NEW)
- (void)ptlrpc_send_new_req(req);
- }
-
do {
timeout = ptlrpc_set_next_timeout(set);
LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
- rc = 0;
+ rc = set->set_rc; /* rq_status of already freed requests if any */
cfs_list_for_each(tmp, &set->set_requests) {
req = cfs_list_entry(tmp, struct ptlrpc_request, rq_set_chain);