Whamcloud - gitweb
Changes to parallel callbacks code to avoid duplication and
authordeen <deen>
Wed, 13 Feb 2008 19:17:25 +0000 (19:17 +0000)
committerdeen <deen>
Wed, 13 Feb 2008 19:17:25 +0000 (19:17 +0000)
resolve problem with ldlm_revoke_export_locks().

b=11301
i=green
i=fanyong

lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_plain.c

index 82fbeed..3ff8e75 100644 (file)
@@ -414,7 +414,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                 if (list_empty(&lock->l_res_link))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                 unlock_res(res);
-                rc = ldlm_run_bl_ast_work(&rpc_list);
+                rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
                 lock_res(res);
                 if (rc == -ERESTART)
                         GOTO(restart, -ERESTART);
index 6cb71e7..919097b 100644 (file)
@@ -399,7 +399,7 @@ restart:
                                                      &rpc_list);
 
                                 unlock_res(res);
-                                rc = ldlm_run_bl_ast_work(&rpc_list);
+                                rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
                                 lock_res(res);
                                 if (rc == -ERESTART)
                                         GOTO(restart, -ERESTART);
index 97f941f..34c6295 100644 (file)
@@ -157,7 +157,7 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
                 if (list_empty(&lock->l_res_link))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                 unlock_res(res);
-                rc = ldlm_run_bl_ast_work(&rpc_list);
+                rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
                 lock_res(res);
                 if (rc == -ERESTART)
                         GOTO(restart, -ERESTART);
index ebfbaac..741190a 100644 (file)
@@ -66,6 +66,12 @@ struct ldlm_cb_set_arg {
         __u32 type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
 };
 
+typedef enum {
+        LDLM_WORK_BL_AST,
+        LDLM_WORK_CP_AST,
+        LDLM_WORK_REVOKE_AST
+} ldlm_desc_ast_t; 
+
 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list);
 struct ldlm_lock *
 ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *,
@@ -80,8 +86,7 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
                                 struct list_head *work_list);
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                          struct list_head *work_list);
-int ldlm_run_bl_ast_work(struct list_head *rpc_list);
-int ldlm_run_cp_ast_work(struct list_head *rpc_list);
+int ldlm_run_ast_work(struct list_head *rpc_list, ldlm_desc_ast_t ast_type);
 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock);
 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock);
 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock);
index 88e1ef2..abc77f6 100644 (file)
@@ -1270,94 +1270,62 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
         RETURN(rc);
 }
 
-/* Helper function for pair ldlm_run_{bl,cp}_ast_work().
+/* Helper function for ldlm_run_ast_work().
  * 
  * Send an existing rpc set specified by @arg->set and then
  * destroy it. Create new one if @do_create flag is set. */
 static void
 ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
 {
-        int rc;
+        ENTRY;
 
-        rc = ptlrpc_set_wait(arg->set);
+        ptlrpc_set_wait(arg->set);
         if (arg->type == LDLM_BL_CALLBACK)
                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
         ptlrpc_set_destroy(arg->set);
 
         if (do_create)
                 arg->set = ptlrpc_prep_set();
+
+        EXIT;
 }
 
-int ldlm_run_bl_ast_work(struct list_head *rpc_list)
+static int
+ldlm_work_bl_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
 {
-        struct ldlm_cb_set_arg arg;
-        struct list_head *tmp, *pos;
         struct ldlm_lock_desc d;
-        int ast_count;
-        int rc = 0;
+        struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_bl_ast);
         ENTRY;
 
-        arg.set = ptlrpc_prep_set();
-        atomic_set(&arg.restart, 0);
-        arg.type = LDLM_BL_CALLBACK;
-
-        ast_count = 0;
-        list_for_each_safe(tmp, pos, rpc_list) {
-                struct ldlm_lock *lock =
-                        list_entry(tmp, struct ldlm_lock, l_bl_ast);
-
-                /* nobody should touch l_bl_ast */
-                lock_res_and_lock(lock);
-                list_del_init(&lock->l_bl_ast);
-
-                LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
-                LASSERT(lock->l_bl_ast_run == 0);
-                LASSERT(lock->l_blocking_lock);
-                lock->l_bl_ast_run++;
-                unlock_res_and_lock(lock);
-
-                ldlm_lock2desc(lock->l_blocking_lock, &d);
+        /* nobody should touch l_bl_ast */
+        lock_res_and_lock(lock);
+        list_del_init(&lock->l_bl_ast);
 
-                LDLM_LOCK_PUT(lock->l_blocking_lock);
-                lock->l_blocking_lock = NULL;
-                rc = lock->l_blocking_ast(lock, &d, (void *)&arg, 
-                                          LDLM_CB_BLOCKING);
-                LDLM_LOCK_PUT(lock);
-                ast_count++;
+        LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+        LASSERT(lock->l_bl_ast_run == 0);
+        LASSERT(lock->l_blocking_lock);
+        lock->l_bl_ast_run++;
+        unlock_res_and_lock(lock);
 
-                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
-                 * and create a new set for requests that remained in
-                 * @rpc_list */
-                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
-                        ldlm_send_and_maybe_create_set(&arg, 1);
-                        ast_count = 0;
-                }
-        }
+        ldlm_lock2desc(lock->l_blocking_lock, &d);
 
-        if (ast_count > 0)
-                ldlm_send_and_maybe_create_set(&arg, 0);
-        else
-                /* In case when number of ASTs is multiply of
-                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
-                 * @arg.set must be destroyed here, otherwise we get 
-                 * write memory leaking. */
-                ptlrpc_set_destroy(arg.set);
+        LDLM_LOCK_PUT(lock->l_blocking_lock);
+        lock->l_blocking_lock = NULL;
+        lock->l_blocking_ast(lock, &d, (void *)arg, 
+                             LDLM_CB_BLOCKING);
+        LDLM_LOCK_PUT(lock);
 
-        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
+        RETURN(1);
 }
 
-int ldlm_run_cp_ast_work(struct list_head *rpc_list)
+static int
+ldlm_work_cp_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
 {
-        struct ldlm_cb_set_arg arg;
-        struct list_head *tmp, *pos;
-        int ast_count;
+        struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_cp_ast);
+        ldlm_completion_callback completion_callback;
         int rc = 0;
         ENTRY;
 
-        arg.set = ptlrpc_prep_set();
-        atomic_set(&arg.restart, 0);
-        arg.type = LDLM_CP_CALLBACK;
-
         /* It's possible to receive a completion AST before we've set
          * the l_completion_ast pointer: either because the AST arrived
          * before the reply, or simply because there's a small race
@@ -1369,27 +1337,75 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list)
          * reader/writer reference, which we won't do until we get the
          * reply and finish enqueueing. */
 
+        /* nobody should touch l_cp_ast */
+        lock_res_and_lock(lock);
+        list_del_init(&lock->l_cp_ast);
+        LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
+        /* save l_completion_ast since it can be changed by
+         * mds_intent_policy(), see bug 14225 */
+        completion_callback = lock->l_completion_ast;
+        lock->l_flags &= ~LDLM_FL_CP_REQD;
+        unlock_res_and_lock(lock);
+
+        if (completion_callback != NULL) {
+                completion_callback(lock, 0, (void *)arg);
+                rc = 1;
+        }
+        LDLM_LOCK_PUT(lock);
+
+        RETURN(rc);
+}
+
+static int
+ldlm_work_revoke_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
+{
+        struct ldlm_lock_desc desc;
+        struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_export_chain);
+        ENTRY;
+
+        list_del_init(&lock->l_export_chain);
+
+        /* the desc just pretend to exclusive */
+        ldlm_lock2desc(lock, &desc);
+        desc.l_req_mode = LCK_EX;
+        desc.l_granted_mode = 0;
+
+        lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
+        LDLM_LOCK_PUT(lock);
+
+        RETURN(1);
+}
+
+int ldlm_run_ast_work(struct list_head *rpc_list, ldlm_desc_ast_t ast_type)
+{
+        struct ldlm_cb_set_arg arg;
+        struct list_head *tmp, *pos;
+        int (*work_ast_lock)(struct list_head *tmp, struct ldlm_cb_set_arg *arg);
+        int ast_count;
+        ENTRY;
+
+        arg.set = ptlrpc_prep_set();
+        atomic_set(&arg.restart, 0);
+        switch (ast_type) {
+        case LDLM_WORK_BL_AST:
+                arg.type = LDLM_BL_CALLBACK;
+                work_ast_lock = ldlm_work_bl_ast_lock;
+                break;
+        case LDLM_WORK_CP_AST:
+                arg.type = LDLM_CP_CALLBACK;
+                work_ast_lock = ldlm_work_cp_ast_lock;
+                break;
+        case LDLM_WORK_REVOKE_AST:
+                arg.type = LDLM_BL_CALLBACK;
+                work_ast_lock = ldlm_work_revoke_ast_lock;
+                break;
+        default:
+                LBUG();
+        }
+
         ast_count = 0;
         list_for_each_safe(tmp, pos, rpc_list) {
-                struct ldlm_lock *lock =
-                        list_entry(tmp, struct ldlm_lock, l_cp_ast);
-                ldlm_completion_callback completion_callback;
-
-                /* nobody should touch l_cp_ast */
-                lock_res_and_lock(lock);
-                list_del_init(&lock->l_cp_ast);
-                LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
-                /* save l_completion_ast since it can be changed by
-                 * mds_intent_policy(), see bug 14225 */
-                completion_callback = lock->l_completion_ast;
-                lock->l_flags &= ~LDLM_FL_CP_REQD;
-                unlock_res_and_lock(lock);
-
-                if (completion_callback != NULL) {
-                        rc = completion_callback(lock, 0, (void *)&arg);
-                        ast_count++;
-                }
-                LDLM_LOCK_PUT(lock);
+                ast_count += work_ast_lock(tmp, &arg);
 
                 /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
                  * and create a new set for requests that remained in
@@ -1471,7 +1487,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
         unlock_res(res);
 
-        rc = ldlm_run_cp_ast_work(&rpc_list);
+        rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
         if (rc == -ERESTART) {
                 LASSERT(list_empty(&rpc_list));
                 goto restart;
@@ -1719,7 +1735,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
         unlock_res_and_lock(lock);
 
         if (granted)
-                ldlm_run_cp_ast_work(&rpc_list);
+                ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
         RETURN(res);
 }
 
index 8ef4562..8c2ea6c 100644 (file)
@@ -1389,7 +1389,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
 
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
 
-        ldlm_run_cp_ast_work(&ast_list);
+        ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST);
 
         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
                           lock);
@@ -1760,7 +1760,6 @@ void ldlm_revoke_export_locks(struct obd_export *exp)
         struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks;
         struct list_head  rpc_list;
         struct ldlm_lock *lock, *next;
-        struct ldlm_lock_desc desc;
 
         ENTRY;
         INIT_LIST_HEAD(&rpc_list);
@@ -1791,26 +1790,14 @@ void ldlm_revoke_export_locks(struct obd_export *exp)
 
                 lock->l_flags |= LDLM_FL_AST_SENT;
                 list_move(&lock->l_export_chain, &rpc_list);
+                LDLM_LOCK_GET(lock);
 
                 unlock_res_and_lock(lock);
         }
         spin_unlock(&exp->exp_ldlm_data.led_lock);
 
-        while (!list_empty(&rpc_list)) {
-                lock = list_entry(rpc_list.next, struct ldlm_lock,
-                                  l_export_chain);
-                list_del_init(&lock->l_export_chain);
-
-                /* the desc just pretend to exclusive */
-                ldlm_lock2desc(lock, &desc);
-                desc.l_req_mode = LCK_EX;
-                desc.l_granted_mode = 0;
+        ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
 
-                LDLM_LOCK_GET(lock);
-                lock->l_blocking_ast(lock, &desc, lock->l_ast_data,
-                                     LDLM_CB_BLOCKING);
-                LDLM_LOCK_PUT(lock);
-        }
         EXIT;
 }
 
index c6b2c89..6ced357 100644 (file)
@@ -132,7 +132,7 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                 if (list_empty(&lock->l_res_link))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                 unlock_res(res);
-                rc = ldlm_run_bl_ast_work(&rpc_list);
+                rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
                 lock_res(res);
                 if (rc == -ERESTART)
                         GOTO(restart, -ERESTART);