RETURN(rc);
}
+/* Helper function for pair ldlm_run_{bl,cp}_ast_work().
+ *
+ * Send an existing rpc set specified by @arg->set and then
+ * destroy it. Create new one if @do_create flag is set. */
+static void
+ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
+{
+ int rc;
+
+ rc = ptlrpc_set_wait(arg->set);
+ if (arg->type == LDLM_BL_CALLBACK)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
+ ptlrpc_set_destroy(arg->set);
+
+ if (do_create)
+ arg->set = ptlrpc_prep_set();
+}
+
int ldlm_run_bl_ast_work(struct list_head *rpc_list)
{
+ struct ldlm_cb_set_arg arg;
struct list_head *tmp, *pos;
struct ldlm_lock_desc d;
- int rc = 0, retval = 0;
+ int ast_count;
+ int rc = 0;
ENTRY;
+ arg.set = ptlrpc_prep_set();
+ atomic_set(&arg.restart, 0);
+ arg.type = LDLM_BL_CALLBACK;
+
+ ast_count = 0;
list_for_each_safe(tmp, pos, rpc_list) {
struct ldlm_lock *lock =
list_entry(tmp, struct ldlm_lock, l_bl_ast);
LDLM_LOCK_PUT(lock->l_blocking_lock);
lock->l_blocking_lock = NULL;
- rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING);
-
- if (rc == -ERESTART)
- retval = rc;
- else if (rc)
- CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
- "disconnect client\n");
+ rc = lock->l_blocking_ast(lock, &d, (void *)&arg,
+ LDLM_CB_BLOCKING);
LDLM_LOCK_PUT(lock);
+ ast_count++;
+
+ /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
+ * and create a new set for requests that remained in
+ * @rpc_list */
+ if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
+ ldlm_send_and_maybe_create_set(&arg, 1);
+ ast_count = 0;
+ }
}
- RETURN(retval);
+
+ if (ast_count > 0)
+ ldlm_send_and_maybe_create_set(&arg, 0);
+ else
+ /* In case when number of ASTs is multiply of
+ * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
+ * @arg.set must be destroyed here, otherwise we get
+ * write memory leaking. */
+ ptlrpc_set_destroy(arg.set);
+
+ RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
}
int ldlm_run_cp_ast_work(struct list_head *rpc_list)
{
+ struct ldlm_cb_set_arg arg;
struct list_head *tmp, *pos;
- int rc = 0, retval = 0;
+ int ast_count;
+ int rc = 0;
ENTRY;
+ arg.set = ptlrpc_prep_set();
+ atomic_set(&arg.restart, 0);
+ arg.type = LDLM_CP_CALLBACK;
+
/* It's possible to receive a completion AST before we've set
* the l_completion_ast pointer: either because the AST arrived
* before the reply, or simply because there's a small race
* reader/writer reference, which we won't do until we get the
* reply and finish enqueueing. */
+ ast_count = 0;
list_for_each_safe(tmp, pos, rpc_list) {
struct ldlm_lock *lock =
list_entry(tmp, struct ldlm_lock, l_cp_ast);
lock->l_flags &= ~LDLM_FL_CP_REQD;
unlock_res_and_lock(lock);
- if (lock->l_completion_ast != NULL)
- rc = lock->l_completion_ast(lock, 0, 0);
- if (rc == -ERESTART)
- retval = rc;
- else if (rc)
- CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
- "disconnect client\n");
+ if (lock->l_completion_ast != NULL) {
+ rc = lock->l_completion_ast(lock, 0, (void *)&arg);
+ ast_count++;
+ }
LDLM_LOCK_PUT(lock);
+
+ /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
+ * and create a new set for requests that remained in
+ * @rpc_list */
+ if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
+ ldlm_send_and_maybe_create_set(&arg, 1);
+ ast_count = 0;
+ }
}
- RETURN(retval);
+
+ if (ast_count > 0)
+ ldlm_send_and_maybe_create_set(&arg, 0);
+ else
+ /* In case when number of ASTs is multiply of
+ * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
+ * @arg.set must be destroyed here, otherwise we get
+ * write memory leaking. */
+ ptlrpc_set_destroy(arg.set);
+
+ RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
}
static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
return rc;
}
+static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
+{
+ struct ldlm_cb_set_arg *arg;
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ LASSERT(data != NULL);
+
+ arg = req->rq_async_args.pointer_arg[0];
+ lock = req->rq_async_args.pointer_arg[1];
+ LASSERT(lock != NULL);
+ if (rc != 0) {
+ /* If client canceled the lock but the cancel has not
+ * been recieved yet, we need to update lvbo to have the
+ * proper attributes cached. */
+ if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
+ ldlm_res_lvbo_update(lock->l_resource, NULL,
+ 0, 1);
+ rc = ldlm_handle_ast_error(lock, req, rc,
+ arg->type == LDLM_BL_CALLBACK
+ ? "blocking" : "completion");
+ }
+
+ LDLM_LOCK_PUT(lock);
+
+ if (rc == -ERESTART)
+ atomic_set(&arg->restart, 1);
+
+ RETURN(0);
+}
+
+static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+ struct ldlm_cb_set_arg *arg,
+ struct ldlm_lock *lock,
+ int instant_cancel)
+{
+ int rc = 0;
+ ENTRY;
+
+ if (unlikely(instant_cancel)) {
+ rc = ptl_send_rpc(req, 1);
+ ptlrpc_req_finished(req);
+ if (rc == 0)
+ /* If we cancelled the lock, we need to restart
+ * ldlm_reprocess_queue */
+ atomic_set(&arg->restart, 1);
+ } else {
+ LDLM_LOCK_GET(lock);
+ ptlrpc_set_add_req(arg->set, req);
+ }
+
+ RETURN(rc);
+}
+
/*
* ->l_blocking_ast() method for server-side locks. This is invoked when newly
* enqueued server lock conflicts with given one.
struct ldlm_lock_desc *desc,
void *data, int flag)
{
+ struct ldlm_cb_set_arg *arg = (struct ldlm_cb_set_arg *)data;
struct ldlm_request *body;
struct ptlrpc_request *req;
int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*body) };
- int instant_cancel = 0, rc = 0;
+ int instant_cancel = 0, rc;
ENTRY;
if (flag == LDLM_CB_CANCELING) {
}
LASSERT(lock);
+ LASSERT(data != NULL);
if (lock->l_export->exp_obd->obd_recovering != 0) {
LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
ldlm_lock_dump(D_ERROR, lock, 0);
if (req == NULL)
RETURN(-ENOMEM);
+ req->rq_async_args.pointer_arg[0] = arg;
+ req->rq_async_args.pointer_arg[1] = lock;
+ req->rq_interpret_reply = ldlm_cb_interpret;
+ req->rq_no_resend = 1;
+
lock_res(lock->l_resource);
if (lock->l_granted_mode != lock->l_req_mode) {
/* this blocking AST will be communicated as part of the
lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- if (unlikely(instant_cancel)) {
- rc = ptl_send_rpc(req, 1);
- } else {
- rc = ptlrpc_queue_wait(req);
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
- }
- if (rc != 0) {
- /* If client canceled the lock but the cancel has not been
- * recieved yet, we need to update lvbo to have the proper
- * attributes cached. */
- if (rc == -EINVAL)
- ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
- rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
- }
-
- ptlrpc_req_finished(req);
-
- /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
- if (!rc && instant_cancel)
- rc = -ERESTART;
+ rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
{
+ struct ldlm_cb_set_arg *arg = (struct ldlm_cb_set_arg *)data;
struct ldlm_request *body;
struct ptlrpc_request *req;
struct timeval granted_time;
long total_enqueue_wait;
int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*body) };
- int rc = 0, buffers = 2, instant_cancel = 0;
+ int rc, buffers = 2, instant_cancel = 0;
ENTRY;
LASSERT(lock != NULL);
+ LASSERT(data != NULL);
do_gettimeofday(&granted_time);
total_enqueue_wait = cfs_timeval_sub(&granted_time,
if (req == NULL)
RETURN(-ENOMEM);
+ req->rq_async_args.pointer_arg[0] = arg;
+ req->rq_async_args.pointer_arg[1] = lock;
+ req->rq_interpret_reply = ldlm_cb_interpret;
+ req->rq_no_resend = 1;
+
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
body->lock_handle[0] = lock->l_remote_handle;
body->lock_flags = flags;
lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ptlrpc_queue_wait(req);
- if (rc != 0)
- rc = ldlm_handle_ast_error(lock, req, rc, "completion");
-
- ptlrpc_req_finished(req);
-
- /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
- if (!rc && instant_cancel)
- rc = -ERESTART;
+ rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}