Whamcloud - gitweb
LU-1061 agl: cl_locks_prune() waits for the last user
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 9d95c13..4c124d4 100644 (file)
@@ -135,6 +135,8 @@ char *ldlm_it2str(int it)
                 return "unlink";
         case IT_GETXATTR:
                 return "getxattr";
+        case IT_LAYOUT:
+                return "layout";
         default:
                 CERROR("Unknown intent %d\n", it);
                 return "UNKNOWN";
@@ -435,6 +437,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         lock->l_exp_refs_nr = 0;
         lock->l_exp_refs_target = NULL;
 #endif
+        CFS_INIT_LIST_HEAD(&lock->l_exp_list);
 
         RETURN(lock);
 }
@@ -566,7 +569,8 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
                 /* Make sure all the right bits are set in this lock we
                    are going to pass to client */
                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
-                         (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE),
+                         (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+                          MDS_INODELOCK_LAYOUT),
                          "Inappropriate inode lock bits during "
                          "conversion " LPU64 "\n",
                          lock->l_policy_data.l_inodebits.bits);
@@ -1052,7 +1056,7 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue,
 
                 if (!unref &&
                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
-                     lock->l_fail_value != 0))
+                     lock->l_failed))
                         continue;
 
                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
@@ -1072,19 +1076,19 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue,
         return NULL;
 }
 
-void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc)
+void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
 {
-        if (lock->l_fail_value == 0) {
-                lock->l_fail_value = rc;
-                cfs_waitq_signal(&lock->l_waitq);
+        if (!lock->l_failed) {
+                lock->l_failed = 1;
+                cfs_waitq_broadcast(&lock->l_waitq);
         }
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
 
-void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc)
+void ldlm_lock_fail_match(struct ldlm_lock *lock)
 {
         lock_res_and_lock(lock);
-        ldlm_lock_fail_match_locked(lock, rc);
+        ldlm_lock_fail_match_locked(lock);
         unlock_res_and_lock(lock);
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match);
@@ -1092,7 +1096,7 @@ EXPORT_SYMBOL(ldlm_lock_fail_match);
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
 {
         lock->l_flags |= LDLM_FL_LVB_READY;
-        cfs_waitq_signal(&lock->l_waitq);
+        cfs_waitq_broadcast(&lock->l_waitq);
 }
 
 void ldlm_lock_allow_match(struct ldlm_lock *lock)
@@ -1202,7 +1206,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
                         l_wait_event(lock->l_waitq,
                                      lock->l_flags & LDLM_FL_LVB_READY ||
-                                     lock->l_fail_value != 0,
+                                     lock->l_failed,
                                      &lwi);
                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
                                 if (flags & LDLM_FL_TEST_LOCK)
@@ -1259,7 +1263,7 @@ ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
         if (lock != NULL) {
                 lock_res_and_lock(lock);
                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
-                    lock->l_fail_value != 0)
+                    lock->l_failed)
                         GOTO(out, mode);
 
                 if (lock->l_flags & LDLM_FL_CBPENDING &&
@@ -1307,7 +1311,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
         lock->l_req_mode = mode;
         lock->l_ast_data = data;
         lock->l_pid = cfs_curproc_pid();
-        lock->l_ns_srv = ns_is_server(ns);
+        lock->l_ns_srv = !!ns_is_server(ns);
         if (cbs) {
                 lock->l_blocking_ast = cbs->lcs_blocking;
                 lock->l_completion_ast = cbs->lcs_completion;
@@ -1574,31 +1578,37 @@ ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type)
 {
-        struct ldlm_cb_set_arg arg = { 0 };
         struct l_wait_info     lwi = { 0 };
+        struct ldlm_cb_set_arg *arg;
         cfs_list_t *tmp, *pos;
         int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
         unsigned int max_ast_count;
+        int rc;
         ENTRY;
 
         if (cfs_list_empty(rpc_list))
                 RETURN(0);
 
-        cfs_atomic_set(&arg.restart, 0);
-        cfs_atomic_set(&arg.rpcs, 0);
-        cfs_waitq_init(&arg.waitq);
+        OBD_ALLOC_PTR(arg);
+        if (arg == NULL)
+                RETURN(-ENOMEM);
+
+        cfs_atomic_set(&arg->restart, 0);
+        cfs_atomic_set(&arg->rpcs, 0);
+        cfs_atomic_set(&arg->refcount, 1);
+        cfs_waitq_init(&arg->waitq);
 
         switch (ast_type) {
         case LDLM_WORK_BL_AST:
-                arg.type = LDLM_BL_CALLBACK;
+                arg->type = LDLM_BL_CALLBACK;
                 work_ast_lock = ldlm_work_bl_ast_lock;
                 break;
         case LDLM_WORK_CP_AST:
-                arg.type = LDLM_CP_CALLBACK;
+                arg->type = LDLM_CP_CALLBACK;
                 work_ast_lock = ldlm_work_cp_ast_lock;
                 break;
         case LDLM_WORK_REVOKE_AST:
-                arg.type = LDLM_BL_CALLBACK;
+                arg->type = LDLM_BL_CALLBACK;
                 work_ast_lock = ldlm_work_revoke_ast_lock;
                 break;
         default:
@@ -1606,22 +1616,24 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
         }
 
         max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
-        arg.threshold = max_ast_count;
+        arg->threshold = max_ast_count;
 
         cfs_list_for_each_safe(tmp, pos, rpc_list) {
-                (void)work_ast_lock(tmp, &arg);
-                if (cfs_atomic_read(&arg.rpcs) < max_ast_count)
+                (void)work_ast_lock(tmp, arg);
+                if (cfs_atomic_read(&arg->rpcs) < max_ast_count)
                         continue;
 
-                l_wait_event(arg.waitq,
-                             cfs_atomic_read(&arg.rpcs) < arg.threshold,
+                l_wait_event(arg->waitq,
+                             cfs_atomic_read(&arg->rpcs) < arg->threshold,
                              &lwi);
         }
 
-        arg.threshold = 1;
-        l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi);
+        arg->threshold = 1;
+        l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi);
 
-        RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
+        rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
+        ldlm_csa_put(arg);
+        RETURN(rc);
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)