Whamcloud - gitweb
Instead of sending blocking and completion callbacks as separated
authordeen <deen>
Tue, 20 Nov 2007 16:34:01 +0000 (16:34 +0000)
committerdeen <deen>
Tue, 20 Nov 2007 16:34:01 +0000 (16:34 +0000)
requests, adding them to a set and sending in parallel.

b=11301
i=green
i=tappro

lustre/ChangeLog
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c

index a0f3f1e..84d35a0 100644 (file)
@@ -492,6 +492,12 @@ Description: touch file failed when fs is not full
 Details    : OST in recovery should not be discarded by MDS in alloc_qos(),
              otherwise we can get ENOSP while fs is not full.
 
+Severity   : normal
+Bugzilla   : 11301
+Description: parallel lock callbacks
+Details    : Instead of sending blocking and completion callbacks as separated
+             requests, adding them to a set and sending in parallel.
+
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
index 89b63c8..5b4b0dd 100644 (file)
@@ -48,6 +48,17 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                      struct ldlm_lock *new);
 
 /* ldlm_lock.c */
+
+/* Number of blocking/completion callbacks that will be sent in
+ * parallel (see bug 11301). */
+#define PARALLEL_AST_LIMIT      200
+
+struct ldlm_cb_set_arg {
+        struct ptlrpc_request_set *set;
+        atomic_t restart;
+        __u16 type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
+};
+
 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list);
 struct ldlm_lock *
 ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *,
index 4f9be34..c7ec85a 100644 (file)
@@ -1268,13 +1268,38 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
         RETURN(rc);
 }
 
+/* Helper function for pair ldlm_run_{bl,cp}_ast_work().
+ * 
+ * Send an existing rpc set specified by @arg->set and then
+ * destroy it. Create new one if @do_create flag is set. */
+static void
+ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
+{
+        int rc;
+
+        rc = ptlrpc_set_wait(arg->set);
+        if (arg->type == LDLM_BL_CALLBACK)
+                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
+        ptlrpc_set_destroy(arg->set);
+
+        if (do_create)
+                arg->set = ptlrpc_prep_set();
+}
+
 int ldlm_run_bl_ast_work(struct list_head *rpc_list)
 {
+        struct ldlm_cb_set_arg arg;
         struct list_head *tmp, *pos;
         struct ldlm_lock_desc d;
-        int rc = 0, retval = 0;
+        int ast_count;
+        int rc = 0;
         ENTRY;
 
+        arg.set = ptlrpc_prep_set();
+        atomic_set(&arg.restart, 0);
+        arg.type = LDLM_BL_CALLBACK;
+
+        ast_count = 0;
         list_for_each_safe(tmp, pos, rpc_list) {
                 struct ldlm_lock *lock =
                         list_entry(tmp, struct ldlm_lock, l_bl_ast);
@@ -1293,24 +1318,44 @@ int ldlm_run_bl_ast_work(struct list_head *rpc_list)
 
                 LDLM_LOCK_PUT(lock->l_blocking_lock);
                 lock->l_blocking_lock = NULL;
-                rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING);
-
-                if (rc == -ERESTART)
-                        retval = rc;
-                else if (rc)
-                        CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
-                               "disconnect client\n");
+                rc = lock->l_blocking_ast(lock, &d, (void *)&arg, 
+                                          LDLM_CB_BLOCKING);
                 LDLM_LOCK_PUT(lock);
+                ast_count++;
+
+                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
+                 * and create a new set for requests that remained in
+                 * @rpc_list */
+                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
+                        ldlm_send_and_maybe_create_set(&arg, 1);
+                        ast_count = 0;
+                }
         }
-        RETURN(retval);
+
+        if (ast_count > 0)
+                ldlm_send_and_maybe_create_set(&arg, 0);
+        else
+                /* In case when number of ASTs is multiply of
+                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
+                 * @arg.set must be destroyed here, otherwise we get 
+                 * write memory leaking. */
+                ptlrpc_set_destroy(arg.set);
+
+        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
 }
 
 int ldlm_run_cp_ast_work(struct list_head *rpc_list)
 {
+        struct ldlm_cb_set_arg arg;
         struct list_head *tmp, *pos;
-        int rc = 0, retval = 0;
+        int ast_count;
+        int rc = 0;
         ENTRY;
 
+        arg.set = ptlrpc_prep_set();
+        atomic_set(&arg.restart, 0);
+        arg.type = LDLM_CP_CALLBACK;
+
         /* It's possible to receive a completion AST before we've set
          * the l_completion_ast pointer: either because the AST arrived
          * before the reply, or simply because there's a small race
@@ -1322,6 +1367,7 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list)
          * reader/writer reference, which we won't do until we get the
          * reply and finish enqueueing. */
 
+        ast_count = 0;
         list_for_each_safe(tmp, pos, rpc_list) {
                 struct ldlm_lock *lock =
                         list_entry(tmp, struct ldlm_lock, l_cp_ast);
@@ -1333,16 +1379,31 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list)
                 lock->l_flags &= ~LDLM_FL_CP_REQD;
                 unlock_res_and_lock(lock);
 
-                if (lock->l_completion_ast != NULL)
-                        rc = lock->l_completion_ast(lock, 0, 0);
-                if (rc == -ERESTART)
-                        retval = rc;
-                else if (rc)
-                        CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
-                               "disconnect client\n");
+                if (lock->l_completion_ast != NULL) {
+                        rc = lock->l_completion_ast(lock, 0, (void *)&arg);
+                        ast_count++;
+                }
                 LDLM_LOCK_PUT(lock);
+
+                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
+                 * and create a new set for requests that remained in
+                 * @rpc_list */
+                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
+                        ldlm_send_and_maybe_create_set(&arg, 1);
+                        ast_count = 0;
+                }
         }
-        RETURN(retval);
+
+        if (ast_count > 0)
+                ldlm_send_and_maybe_create_set(&arg, 0);
+        else
+                /* In case when number of ASTs is multiply of
+                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
+                 * @arg.set must be destroyed here, otherwise we get 
+                 * write memory leaking. */
+                ptlrpc_set_destroy(arg.set);
+
+        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
index 08b2ead..35b08ca 100644 (file)
@@ -495,6 +495,60 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
         return rc;
 }
 
+static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
+{
+        struct ldlm_cb_set_arg *arg;
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        LASSERT(data != NULL);
+
+        arg = req->rq_async_args.pointer_arg[0];
+        lock = req->rq_async_args.pointer_arg[1];
+        LASSERT(lock != NULL);
+        if (rc != 0) {
+                /* If client canceled the lock but the cancel has not 
+                 * been recieved yet, we need to update lvbo to have the
+                 * proper attributes cached. */
+                if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
+                        ldlm_res_lvbo_update(lock->l_resource, NULL, 
+                                             0, 1);
+                rc = ldlm_handle_ast_error(lock, req, rc, 
+                                           arg->type == LDLM_BL_CALLBACK
+                                           ? "blocking" : "completion");
+        }                
+
+        LDLM_LOCK_PUT(lock);
+
+        if (rc == -ERESTART)
+                atomic_set(&arg->restart, 1);
+
+        RETURN(0);
+}
+
+static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+                                          struct ldlm_cb_set_arg *arg,
+                                          struct ldlm_lock *lock,
+                                          int instant_cancel)
+{
+        int rc = 0;
+        ENTRY;
+
+        if (unlikely(instant_cancel)) {
+                rc = ptl_send_rpc(req, 1);
+                ptlrpc_req_finished(req);
+                if (rc == 0)
+                        /* If we cancelled the lock, we need to restart
+                         * ldlm_reprocess_queue */
+                        atomic_set(&arg->restart, 1);
+        } else {
+                LDLM_LOCK_GET(lock);
+                ptlrpc_set_add_req(arg->set, req);
+        }       
+
+        RETURN(rc);
+}
+
 /*
  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
  * enqueued server lock conflicts with given one.
@@ -506,11 +560,12 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                              struct ldlm_lock_desc *desc,
                              void *data, int flag)
 {
+        struct ldlm_cb_set_arg *arg = (struct ldlm_cb_set_arg *)data;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
         int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int instant_cancel = 0, rc = 0;
+        int instant_cancel = 0, rc;
         ENTRY;
 
         if (flag == LDLM_CB_CANCELING) {
@@ -519,6 +574,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         }
 
         LASSERT(lock);
+        LASSERT(data != NULL);
         if (lock->l_export->exp_obd->obd_recovering != 0) {
                 LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
                 ldlm_lock_dump(D_ERROR, lock, 0);
@@ -530,6 +586,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
+        req->rq_async_args.pointer_arg[0] = arg;
+        req->rq_async_args.pointer_arg[1] = lock;
+        req->rq_interpret_reply = ldlm_cb_interpret;
+        req->rq_no_resend = 1;
+
         lock_res(lock->l_resource);
         if (lock->l_granted_mode != lock->l_req_mode) {
                 /* this blocking AST will be communicated as part of the
@@ -574,42 +635,25 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
 
-        if (unlikely(instant_cancel)) {
-                rc = ptl_send_rpc(req, 1);
-        } else {
-                rc = ptlrpc_queue_wait(req);
-                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
-        }
-        if (rc != 0) {
-                /* If client canceled the lock but the cancel has not been
-                 * recieved yet, we need to update lvbo to have the proper
-                 * attributes cached. */
-                if (rc == -EINVAL)
-                        ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
-                rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
-        }
-
-        ptlrpc_req_finished(req);
-
-        /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
-        if (!rc && instant_cancel)
-                rc = -ERESTART;
+        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
 
 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
+        struct ldlm_cb_set_arg *arg = (struct ldlm_cb_set_arg *)data;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
         struct timeval granted_time;
         long total_enqueue_wait;
         int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                         [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int rc = 0, buffers = 2, instant_cancel = 0;
+        int rc, buffers = 2, instant_cancel = 0;
         ENTRY;
 
         LASSERT(lock != NULL);
+        LASSERT(data != NULL);
 
         do_gettimeofday(&granted_time);
         total_enqueue_wait = cfs_timeval_sub(&granted_time,
@@ -632,6 +676,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (req == NULL)
                 RETURN(-ENOMEM);
 
+        req->rq_async_args.pointer_arg[0] = arg;
+        req->rq_async_args.pointer_arg[1] = lock;
+        req->rq_interpret_reply = ldlm_cb_interpret;
+        req->rq_no_resend = 1;
+
         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
         body->lock_handle[0] = lock->l_remote_handle;
         body->lock_flags = flags;
@@ -683,15 +732,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc != 0)
-                rc = ldlm_handle_ast_error(lock, req, rc, "completion");
-
-        ptlrpc_req_finished(req);
-
-        /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
-        if (!rc && instant_cancel)
-                rc = -ERESTART;
+        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }