Whamcloud - gitweb
b=3984
authoralex <alex>
Sun, 10 Jul 2005 23:10:17 +0000 (23:10 +0000)
committeralex <alex>
Sun, 10 Jul 2005 23:10:17 +0000 (23:10 +0000)
 - b_ldlm_newlocking landed. we need it to pass some CMD2 performance tests
   NOTE: the new locking rules introduced by the patch are still experimental!

24 files changed:
lustre/cmobd/cm_oss_reint.c
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_lib.h
lustre/ldlm/l_lock.c
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_plain.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/mdc/mdc_locks.c
lustre/mds/handler.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdclass/genops.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_lvb.c
lustre/osc/osc_request.c

index eff4777..fc9a9b9 100644 (file)
@@ -27,6 +27,7 @@
 #include <linux/lustre_lib.h>
 #include <linux/lustre_net.h>
 #include <linux/lustre_idl.h>
+#include <linux/lustre_dlm.h>
 #include <linux/obd_class.h>
 #include <linux/lustre_log.h>
 #include <linux/lustre_cmobd.h>
@@ -156,20 +157,21 @@ static int cache_blocking_ast(struct ldlm_lock *lock,
         }
 
         /* XXX layering violation!  -phil */
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
+        
         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
          * such that filter_blocking_ast is called just before l_i_p takes the
          * ns_lock, then by the time we get the lock, we might not be the
          * correct blocking function anymore.  So check, and return early, if
          * so. */
         if (lock->l_blocking_ast != cache_blocking_ast) {
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                unlock_res(lock->l_resource);
                 RETURN(0);
         }
 
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
 
         if (do_ast) {
                 struct lustre_handle lockh;
index 74b1c52..2ad9b21 100644 (file)
@@ -99,6 +99,17 @@ typedef enum {
  * list. */
 #define LDLM_FL_KMS_IGNORE     0x200000
 
+/* completion ast to be executed */
+#define LDLM_FL_CP_REQD        0x400000
+
+/* cleanup_resource has already handled the lock */
+#define LDLM_FL_CLEANED        0x800000
+
+/* optimization hint: LDLM can run blocking callback from current context
+ * w/o involving separate thread. in order to decrease cs rate -bzzz */
+#define LDLM_FL_ATOMIC_CB      0x1000000
+
+
 /* The blocking callback is overloaded to perform two functions.  These flags
  * indicate which operation should be performed. */
 #define LDLM_CB_BLOCKING    1
@@ -148,6 +159,25 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
    -
 */
 
+/*
+ * Locking rules:
+ *
+ * lr_lock
+ *
+ * lr_lock
+ *     waiting_locks_spinlock
+ *
+ * lr_lock
+ *     led_lock
+ *
+ * lr_lock
+ *     ns_unused_lock
+ *
+ * lr_lvb_sem
+ *     lr_lock
+ *
+ */
+
 struct ldlm_lock;
 struct ldlm_resource;
 struct ldlm_namespace;
@@ -168,9 +198,9 @@ struct ldlm_namespace {
         char                  *ns_name;
         __u32                  ns_client; /* is this a client-side lock tree? */
         struct list_head      *ns_hash; /* hash table for ns */
+        spinlock_t             ns_hash_lock;
         __u32                  ns_refcount; /* count of resources in the hash */
         struct list_head       ns_root_list; /* all root resources in ns */
-        struct lustre_lock     ns_lock; /* protects hash, refcount, list */
         struct list_head       ns_list_chain; /* position in global NS list */
         /*
         struct proc_dir_entry *ns_proc_dir;
@@ -178,11 +208,12 @@ struct ldlm_namespace {
 
         struct list_head       ns_unused_list; /* all root resources in ns */
         int                    ns_nr_unused;
+        spinlock_t             ns_unused_lock;
+
         unsigned int           ns_max_unused;
         unsigned long          ns_next_dump;   /* next dump time */
 
-        spinlock_t             ns_counter_lock;
-        __u64                  ns_locks;
+        atomic_t               ns_locks;
         __u64                  ns_resources;
         ldlm_res_policy        ns_policy;
         struct ldlm_valblock_ops *ns_lvbo;
@@ -212,14 +243,27 @@ typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data);
 struct ldlm_lock {
         struct portals_handle l_handle; // must be first in the structure
         atomic_t              l_refc;
+
+        /* ldlm_lock_change_resource() can change this */
         struct ldlm_resource *l_resource;
+
+        /* set once, no need to protect it */
         struct ldlm_lock     *l_parent;
+
+        /* protected by ns_hash_lock */
         struct list_head      l_children;
         struct list_head      l_childof;
+
+        /* protected by ns_hash_lock. FIXME */
         struct list_head      l_lru;
+
+        /* protected by lr_lock */
         struct list_head      l_res_link; // position in one of three res lists
+
+        /* protected by led_lock */
         struct list_head      l_export_chain; // per-export chain of locks
 
+        /* protected by lr_lock */
         ldlm_mode_t           l_req_mode;
         ldlm_mode_t           l_granted_mode;
 
@@ -229,10 +273,14 @@ struct ldlm_lock {
 
         struct obd_export    *l_export;
         struct obd_export    *l_conn_export;
+
+        /* protected by lr_lock */
         __u32                 l_flags;
+
         struct lustre_handle  l_remote_handle;
         ldlm_policy_data_t    l_policy_data;
 
+        /* protected by lr_lock */
         __u32                 l_readers;
         __u32                 l_writers;
         __u8                  l_destroyed;
@@ -253,12 +301,20 @@ struct ldlm_lock {
         void                 *l_ast_data;
 
         /* Server-side-only members */
+
+        /* protected by elt_lock */
         struct list_head      l_pending_chain;  /* callbacks pending */
         unsigned long         l_callback_timeout;
 
         __u32                 l_pid;            /* pid which created this lock */
 
         struct list_head      l_tmp;
+
+        /* for ldlm_add_ast_work_item() */
+        struct list_head      l_bl_ast;
+        struct list_head      l_cp_ast;
+        struct ldlm_lock     *l_blocking_lock; 
+        int                   l_bl_ast_run;
 };
 
 #define LDLM_PLAIN       10
@@ -271,18 +327,21 @@ struct ldlm_lock {
 
 struct ldlm_resource {
         struct ldlm_namespace *lr_namespace;
+
+        /* protected by ns_hash_lock */
         struct list_head       lr_hash;
         struct ldlm_resource  *lr_parent;   /* 0 for a root resource */
         struct list_head       lr_children; /* list head for child resources */
         struct list_head       lr_childof;  /* part of ns_root_list if root res,
                                              * part of lr_children if child */
+        spinlock_t             lr_lock;
 
+        /* protected by lr_lock */
         struct list_head       lr_granted;
         struct list_head       lr_converting;
         struct list_head       lr_waiting;
         ldlm_mode_t            lr_most_restr;
         __u32                  lr_type; /* LDLM_PLAIN or LDLM_EXTENT */
-        struct ldlm_resource  *lr_root;
         struct ldlm_res_id     lr_name;
         atomic_t               lr_refcount;
 
@@ -436,7 +495,8 @@ do {                                                                          \
         CDEBUG(D_DLMTRACE, "### " format "\n" , ## a)
 
 typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, int *flags,
-                                      int first_enq, ldlm_error_t *err);
+                                      int first_enq, ldlm_error_t *err,
+                                      struct list_head *work_list);
 
 /*
  * Iterators.
@@ -606,4 +666,20 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 #define IOC_LDLM_REGRESS_STOP           _IOWR('f', 43, long)
 #define IOC_LDLM_MAX_NR                 43
 
+static inline void lock_res(struct ldlm_resource *res)
+{
+        spin_lock(&res->lr_lock);
+}
+
+static inline void unlock_res(struct ldlm_resource *res)
+{
+        spin_unlock(&res->lr_lock);
+}
+
+static inline void check_res_locked(struct ldlm_resource *res)
+{
+        LASSERT_SPIN_LOCKED(&res->lr_lock);
+}
+
+
 #endif
index 1fc3263..23aeb88 100644 (file)
@@ -39,6 +39,7 @@ struct osc_creator {
 
 struct ldlm_export_data {
         struct list_head       led_held_locks; /* protected by namespace lock */
+        spinlock_t             led_lock;
 };
 
 struct ec_export_data { /* echo client */
index c4ec73e..6eada1e 100644 (file)
@@ -121,20 +121,6 @@ struct obd_client_handle {
 void statfs_pack(struct obd_statfs *osfs, struct kstatfs *sfs);
 void statfs_unpack(struct kstatfs *sfs, struct obd_statfs *osfs);
 
-/* l_lock.c */
-struct lustre_lock {
-        int l_depth;
-        struct task_struct *l_owner;
-        struct semaphore l_sem;
-        spinlock_t l_spin;
-};
-
-void l_lock_init(struct lustre_lock *);
-void l_lock(struct lustre_lock *);
-void l_unlock(struct lustre_lock *);
-int l_has_lock(struct lustre_lock *);
-
-
 /*
  *   OBD IOCTLS
  */
index 11cd02d..746b485 100644 (file)
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_lib.h>
 
-/* invariants:
- - only the owner of the lock changes l_owner/l_depth
- - if a non-owner changes or checks the variables a spin lock is taken
-*/
-
-void l_lock_init(struct lustre_lock *lock)
-{
-        sema_init(&lock->l_sem, 1);
-        spin_lock_init(&lock->l_spin);
-}
-
-void l_lock(struct lustre_lock *lock)
-{
-        int owner = 0;
-
-        spin_lock(&lock->l_spin);
-        if (lock->l_owner == current)
-                owner = 1;
-        spin_unlock(&lock->l_spin);
-
-        /* This is safe to increment outside the spinlock because we
-         * can only have 1 CPU running on the current task
-         * (i.e. l_owner == current), regardless of the number of CPUs.
-         */
-        if (owner) {
-                ++lock->l_depth;
-        } else {
-                down(&lock->l_sem);
-                spin_lock(&lock->l_spin);
-                lock->l_owner = current;
-                lock->l_depth = 0;
-                spin_unlock(&lock->l_spin);
-        }
-}
-
-void l_unlock(struct lustre_lock *lock)
-{
-        LASSERTF(lock->l_owner == current, "lock %p, current %p\n",
-                 lock->l_owner, current);
-        LASSERTF(lock->l_depth >= 0, "depth %d\n", lock->l_depth);
-        spin_lock(&lock->l_spin);
-        if (--lock->l_depth < 0) {
-                lock->l_owner = NULL;
-                spin_unlock(&lock->l_spin);
-                up(&lock->l_sem);
-                return;
-        }
-        spin_unlock(&lock->l_spin);
-}
-
-int l_has_lock(struct lustre_lock *lock)
-{
-        int depth = -1, owner = 0;
-
-        spin_lock(&lock->l_spin);
-        if (lock->l_owner == current) {
-                depth = lock->l_depth;
-                owner = 1;
-        }
-        spin_unlock(&lock->l_spin);
-
-        if (depth >= 0)
-                CDEBUG(D_INFO, "lock_depth: %d\n", depth);
-        return owner;
-}
-
-#ifdef __KERNEL__
-#include <linux/lustre_version.h>
-void l_check_no_ns_lock(struct ldlm_namespace *ns)
-{
-        static unsigned long next_msg;
-
-        if (l_has_lock(&ns->ns_lock) && time_after(jiffies, next_msg)) {
-                CERROR("namespace %s lock held illegally; tell phil\n",
-                       ns->ns_name);
-                portals_debug_dumpstack(NULL);
-                next_msg = jiffies + 60 * HZ;
-        }
-}
-
-#else
-void l_check_no_ns_lock(struct ldlm_namespace *ns)
-{
-        if (l_has_lock(&ns->ns_lock)) {
-                CERROR("namespace %s lock held illegally; tell phil\n",
-                       ns->ns_name);
-        }
-}
-#endif /* __KERNEL__ */
index 5661d73..391d493 100644 (file)
@@ -156,7 +156,8 @@ static void ldlm_extent_policy(struct ldlm_resource *res,
  */
 static int
 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                         int send_cbs, int *flags, ldlm_error_t *err)
+                         int *flags, ldlm_error_t *err,
+                         struct list_head *work_list)
 {
         struct list_head *tmp;
         struct ldlm_lock *lock;
@@ -275,12 +276,12 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                         continue;
                 }
 
-                if (!send_cbs)
+                if (!work_list)
                         RETURN(0);
 
                 compat = 0;
                 if (lock->l_blocking_ast)
-                        ldlm_add_ast_work_item(lock, req, NULL, 0);
+                        ldlm_add_ast_work_item(lock, req, work_list);
         }
 
         return(compat);
@@ -301,7 +302,7 @@ destroylock:
   *   - the caller has NOT initialized req->lr_tmp, so we must
   *   - must call this function with the ns lock held once */
 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
-                             ldlm_error_t *err)
+                             ldlm_error_t *err, struct list_head *work_list)
 {
         struct ldlm_resource *res = lock->l_resource;
         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
@@ -318,44 +319,38 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                  * flags should always be zero here, and if that ever stops
                  * being true, we want to find out. */
                 LASSERT(*flags == 0);
-                LASSERT(res->lr_tmp != NULL);
-                rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0, flags,
-                                              err);
+                rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
+                                              err, NULL);
                 if (rc == 1) {
-                        rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0,
-                                                      flags, err);
+                        rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
+                                                      flags, err, NULL);
                 }
                 if (rc == 0)
                         RETURN(LDLM_ITER_STOP);
 
                 ldlm_resource_unlink_lock(lock);
-
                 ldlm_extent_policy(res, lock, flags);
-                ldlm_grant_lock(lock, NULL, 0, 1);
+                ldlm_grant_lock(lock, work_list);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
  restart:
-        LASSERT(res->lr_tmp == NULL);
-        res->lr_tmp = &rpc_list;
-        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1, flags, err);
+        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
         if (rc < 0)
                 GOTO(out, rc); /* lock was destroyed */
         if (rc == 2) {
-                res->lr_tmp = NULL;
                 goto grant;
         }
 
-        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, 1, flags, err);
+        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
         if (rc2 < 0)
                 GOTO(out, rc = rc2); /* lock was destroyed */
-        res->lr_tmp = NULL;
 
         if (rc + rc2 == 2) {
         grant:
                 ldlm_extent_policy(res, lock, flags);
                 ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL, 0, 0);
+                ldlm_grant_lock(lock, NULL);
         } else {
                 /* If either of the compat_queue()s returned failure, then we
                  * have ASTs to send and must go onto the waiting list.
@@ -365,16 +360,15 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
                 if (list_empty(&lock->l_res_link))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
-                l_unlock(&res->lr_namespace->ns_lock);
-                rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
-                l_lock(&res->lr_namespace->ns_lock);
+                unlock_res(res);
+                rc = ldlm_run_bl_ast_work(&rpc_list);
+                lock_res(res);
                 if (rc == -ERESTART)
                         GOTO(restart, -ERESTART);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
         }
         rc = 0;
 out:
-        res->lr_tmp = NULL;
         RETURN(rc);
 }
 
index 63fb58c..a86c021 100644 (file)
@@ -119,7 +119,7 @@ restart:
 
 int
 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
-                        ldlm_error_t *err)
+                        ldlm_error_t *err, struct list_head *work_list)
 {
         struct ldlm_resource *res = req->l_resource;
         struct ldlm_namespace *ns = res->lr_namespace;
@@ -353,7 +353,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                  &new2->l_export->exp_ldlm_data.led_held_locks);
                 }
                 if (*flags == LDLM_FL_WAIT_NOREPROC)
-                        ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+                        ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
 
                 /* insert new2 at lock */
                 ldlm_resource_add_lock(res, ownlocks, new2);
@@ -387,20 +387,16 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                                     = LIST_HEAD_INIT(rpc_list);
                                 int rc;
 restart:
-                                res->lr_tmp = &rpc_list;
-                                ldlm_reprocess_queue(res, &res->lr_waiting);
-                                res->lr_tmp = NULL;
-
-                                l_unlock(&ns->ns_lock);
-                                rc = ldlm_run_ast_work(res->lr_namespace,
-                                                       &rpc_list);
-                                l_lock(&ns->ns_lock);
+                                ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
+                                unlock_res(res);
+                                rc = ldlm_run_cp_ast_work(&rpc_list);
+                                lock_res(res);
                                 if (rc == -ERESTART)
                                         GOTO(restart, -ERESTART);
                        }
                 } else {
                         LASSERT(req->l_completion_ast);
-                        ldlm_add_ast_work_item(req, NULL, NULL, 0);
+                        ldlm_add_ast_work_item(req, NULL, NULL);
                 }
         }
 
@@ -495,7 +491,7 @@ granted:
 
         LDLM_DEBUG(lock, "client-side enqueue granted");
         ns = lock->l_resource->lr_namespace;
-        l_lock(&ns->ns_lock);
+        lock_res(lock->l_resource);
 
         /* take lock off the deadlock detection waitq. */
         list_del_init(&lock->l_flock_waitq);
@@ -526,28 +522,25 @@ granted:
 
                 /* We need to reprocess the lock to do merges or splits
                  * with existing locks owned by this process. */
-                ldlm_process_flock_lock(lock, &noreproc, 1, &err);
+                ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
                 if (flags == 0)
                         wake_up(&lock->l_waitq);
         }
-        l_unlock(&ns->ns_lock);
+        unlock_res(lock->l_resource);
         RETURN(0);
 }
 
 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                             void *data, int flag)
 {
-        struct ldlm_namespace *ns;
         ENTRY;
                                                                                                                              
         LASSERT(lock);
         LASSERT(flag == LDLM_CB_CANCELING);
                                                                                                                              
-        ns = lock->l_resource->lr_namespace;
-                                                                                                                             
         /* take lock off the deadlock detection waitq. */
-        l_lock(&ns->ns_lock);
+        lock_res(lock->l_resource);
         list_del_init(&lock->l_flock_waitq);
-        l_unlock(&ns->ns_lock);
+        unlock_res(lock->l_resource);
         RETURN(0);
 }
index e3511dd..56c88cf 100644 (file)
@@ -35,7 +35,7 @@
 /* Determine if the lock is compatible with all locks on the queue. */
 static int
 ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                            int send_cbs)
+                            struct list_head *work_list)
 {
         struct list_head *tmp;
         struct ldlm_lock *lock;
@@ -61,12 +61,12 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                 if (!(lock->l_policy_data.l_inodebits.bits & req_bits))
                         continue;
 
-                if (!send_cbs)
+                if (!work_list)
                         RETURN(0);
 
                 compat = 0;
                 if (lock->l_blocking_ast)
-                        ldlm_add_ast_work_item(lock, req, NULL, 0);
+                        ldlm_add_ast_work_item(lock, req, work_list);
         }
 
         RETURN(compat);
@@ -82,7 +82,8 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
   *   - the caller has NOT initialized req->lr_tmp, so we must
   *   - must call this function with the ns lock held once */
 int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
-                                int first_enq, ldlm_error_t *err)
+                                int first_enq, ldlm_error_t *err,
+                                struct list_head *work_list)
 {
         struct ldlm_resource *res = lock->l_resource;
         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
@@ -90,27 +91,25 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
         ENTRY;
 
         LASSERT(list_empty(&res->lr_converting));
+        check_res_locked(res);
 
         if (!first_enq) {
-                LASSERT(res->lr_tmp != NULL);
-                rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 0);
+                LASSERT(work_list != NULL);
+                rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL);
                 if (!rc)
                         RETURN(LDLM_ITER_STOP);
-                rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 0);
+                rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL);
                 if (!rc)
                         RETURN(LDLM_ITER_STOP);
 
                 ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL, 0, 1);
+                ldlm_grant_lock(lock, work_list);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
  restart:
-        LASSERT(res->lr_tmp == NULL);
-        res->lr_tmp = &rpc_list;
-        rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 1);
-        rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 1);
-        res->lr_tmp = NULL;
+        rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
+        rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list);
 
         if (rc != 2) {
                 /* If either of the compat_queue()s returned 0, then we
@@ -121,15 +120,15 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
                 if (list_empty(&lock->l_res_link))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
-                l_unlock(&res->lr_namespace->ns_lock);
-                rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
-                l_lock(&res->lr_namespace->ns_lock);
+                unlock_res(res);
+                rc = ldlm_run_bl_ast_work(&rpc_list);
+                lock_res(res);
                 if (rc == -ERESTART)
                         GOTO(restart, -ERESTART);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
         } else {
                 ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL, 0, 0);
+                ldlm_grant_lock(lock, NULL);
         }
         RETURN(0);
 }
index 3a79a52..c6ee99e 100644 (file)
@@ -32,10 +32,11 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync);
 /* ldlm_resource.c */
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                      struct ldlm_lock *new);
+int ldlm_resource_putref_locked(struct ldlm_resource *res);
 
 /* ldlm_lock.c */
-void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
-                     int run_ast);
+void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list);
+
 struct ldlm_lock *
 ldlm_lock_create(struct ldlm_namespace *ns,
                  struct lustre_handle *parent_lock_handle, struct ldlm_res_id,
@@ -44,12 +45,15 @@ ldlm_lock_create(struct ldlm_namespace *ns,
                  __u32 lvb_len);
 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
                                void *cookie, int *flags);
+void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, __u32 mode);
 void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
 void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode);
 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
-                            void *data, int datalen);
-int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue);
-int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list);
+                                struct list_head *work_list);
+int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
+                         struct list_head *work_list);
+int ldlm_run_bl_ast_work(struct list_head *rpc_list);
+int ldlm_run_cp_ast_work(struct list_head *rpc_list);
 
 /* ldlm_lockd.c */
 int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
@@ -59,19 +63,20 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
 
 /* ldlm_plain.c */
 int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
-                            ldlm_error_t *err);
+                            ldlm_error_t *err, struct list_head *work_list);
 
 /* ldlm_extent.c */
 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
-                             ldlm_error_t *err);
+                             ldlm_error_t *err, struct list_head *work_list);
 
 /* ldlm_flock.c */
-int ldlm_process_flock_lock(struct ldlm_lock *lock, int *flags, int first_enq,
-                            ldlm_error_t *err);
+int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
+                            ldlm_error_t *err, struct list_head *work_list);
 
 /* ldlm_inodebits.c */
 int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
-                               int first_enq, ldlm_error_t *err);
+                               int first_enq, ldlm_error_t *err,
+                                struct list_head *work);
 
 /* l_lock.c */
 void l_check_no_ns_lock(struct ldlm_namespace *ns);
index 6d2dae8..d73b52a 100644 (file)
@@ -85,7 +85,6 @@ char *ldlm_it2str(int it)
 }
 
 extern kmem_cache_t *ldlm_lock_slab;
-struct lustre_lock ldlm_handle_lock;
 
 static ldlm_processing_policy ldlm_processing_policy_table[] = {
         [LDLM_PLAIN] ldlm_process_plain_lock,
@@ -127,31 +126,33 @@ void ldlm_lock_put(struct ldlm_lock *lock)
 {
         ENTRY;
 
+        LASSERT(lock->l_resource != LP_POISON);
+        LASSERT(atomic_read(&lock->l_refc) > 0);
         if (atomic_dec_and_test(&lock->l_refc)) {
-                struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+                struct ldlm_resource *res = lock->l_resource;
+                struct ldlm_namespace *ns = res->lr_namespace;
 
-                l_lock(&ns->ns_lock);
                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
+
+                LASSERT(lock->l_resource != LP_POISON);
+                lock_res(res);
                 LASSERT(lock->l_destroyed);
                 LASSERT(list_empty(&lock->l_res_link));
 
-                spin_lock(&ns->ns_counter_lock);
-                ns->ns_locks--;
-                spin_unlock(&ns->ns_counter_lock);
+                if (lock->l_parent)
+                        LDLM_LOCK_PUT(lock->l_parent);
+                unlock_res(res);
 
                 ldlm_resource_putref(lock->l_resource);
                 lock->l_resource = NULL;
                 if (lock->l_export)
                         class_export_put(lock->l_export);
-
-                if (lock->l_parent)
-                        LDLM_LOCK_PUT(lock->l_parent);
+                atomic_dec(&ns->ns_locks);
 
                 if (lock->l_lvb_data != NULL)
                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
 
                 OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
-                l_unlock(&ns->ns_lock);
         }
 
         EXIT;
@@ -160,14 +161,14 @@ void ldlm_lock_put(struct ldlm_lock *lock)
 void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
 {
         ENTRY;
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        spin_lock(&lock->l_resource->lr_namespace->ns_unused_lock);
         if (!list_empty(&lock->l_lru)) {
                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
                 list_del_init(&lock->l_lru);
                 lock->l_resource->lr_namespace->ns_nr_unused--;
                 LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
         }
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        spin_unlock(&lock->l_resource->lr_namespace->ns_unused_lock);
         EXIT;
 }
 
@@ -179,7 +180,8 @@ void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
 void ldlm_lock_destroy(struct ldlm_lock *lock)
 {
         ENTRY;
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
+        lock_res(lock->l_resource);
 
         if (!list_empty(&lock->l_children)) {
                 LDLM_ERROR(lock, "still has children (%p)!",
@@ -201,13 +203,21 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
 
         if (lock->l_destroyed) {
                 LASSERT(list_empty(&lock->l_lru));
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                unlock_res(lock->l_resource);
                 EXIT;
                 return;
         }
         lock->l_destroyed = 1;
 
-        list_del_init(&lock->l_export_chain);
+        if (lock->l_export) {
+                spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
+                if (!list_empty(&lock->l_export_chain))
+                        list_del_init(&lock->l_export_chain);
+                spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
+        } else {
+                LASSERT(list_empty(&lock->l_export_chain));
+        }       
+
         ldlm_lock_remove_from_lru(lock);
         class_handle_unhash(&lock->l_handle);
 
@@ -222,7 +232,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
                 lock->l_completion_ast(lock, 0);
 #endif
 
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
         LDLM_LOCK_PUT(lock);
         EXIT;
 }
@@ -261,17 +271,18 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         INIT_LIST_HEAD(&lock->l_export_chain);
         INIT_LIST_HEAD(&lock->l_pending_chain);
         INIT_LIST_HEAD(&lock->l_tmp);
+        INIT_LIST_HEAD(&lock->l_bl_ast);
+        INIT_LIST_HEAD(&lock->l_cp_ast);
         init_waitqueue_head(&lock->l_waitq);
+        lock->l_blocking_lock = NULL;
 
-        spin_lock(&resource->lr_namespace->ns_counter_lock);
-        resource->lr_namespace->ns_locks++;
-        spin_unlock(&resource->lr_namespace->ns_counter_lock);
+        atomic_inc(&resource->lr_namespace->ns_locks);
 
         if (parent != NULL) {
-                l_lock(&parent->l_resource->lr_namespace->ns_lock);
+                spin_lock(&resource->lr_namespace->ns_hash_lock);
                 lock->l_parent = LDLM_LOCK_GET(parent);
                 list_add(&lock->l_childof, &parent->l_children);
-                l_unlock(&parent->l_resource->lr_namespace->ns_lock);
+                spin_unlock(&resource->lr_namespace->ns_hash_lock);
         }
 
         INIT_LIST_HEAD(&lock->l_handle.h_link);
@@ -286,11 +297,11 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
         struct ldlm_resource *oldres = lock->l_resource;
         ENTRY;
 
-        l_lock(&ns->ns_lock);
+        lock_res(oldres);
         if (memcmp(&new_resid, &lock->l_resource->lr_name,
                    sizeof(lock->l_resource->lr_name)) == 0) {
                 /* Nothing to do */
-                l_unlock(&ns->ns_lock);
+                unlock_res(oldres);
                 RETURN(0);
         }
 
@@ -307,10 +318,11 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
                 RETURN(-ENOMEM);
         }
 
+        unlock_res(oldres);
+
         /* ...and the flowers are still standing! */
         ldlm_resource_putref(oldres);
 
-        l_unlock(&ns->ns_lock);
         RETURN(0);
 }
 
@@ -343,17 +355,19 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
         ns = lock->l_resource->lr_namespace;
         LASSERT(ns != NULL);
 
-        l_lock(&ns->ns_lock);
+        lock_res(lock->l_resource);
 
         /* It's unlikely but possible that someone marked the lock as
          * destroyed after we did handle2object on it */
         if (lock->l_destroyed) {
+                unlock_res(lock->l_resource);
                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
                 LDLM_LOCK_PUT(lock);
                 GOTO(out, retval);
         }
 
         if (flags && (lock->l_flags & flags)) {
+                unlock_res(lock->l_resource);
                 LDLM_LOCK_PUT(lock);
                 GOTO(out, retval);
         }
@@ -361,10 +375,10 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
         if (flags)
                 lock->l_flags |= flags;
 
+        unlock_res(lock->l_resource);
         retval = lock;
         EXIT;
  out:
-        l_unlock(&ns->ns_lock);
         return retval;
 }
 
@@ -372,11 +386,7 @@ struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
                                       struct lustre_handle *handle)
 {
         struct ldlm_lock *retval = NULL;
-
-        l_lock(&ns->ns_lock);
         retval = __ldlm_handle2lock(handle, 0);
-        l_unlock(&ns->ns_lock);
-
         return retval;
 }
 
@@ -389,42 +399,46 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
                sizeof(desc->l_policy_data));
 }
 
-void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
-                            void *data, int datalen)
+void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
+                           struct list_head *work_list)
 {
-        struct ldlm_ast_work *w;
-        ENTRY;
-
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        if (new && (lock->l_flags & LDLM_FL_AST_SENT))
-                GOTO(out, 0);
-
-        CDEBUG(D_OTHER, "lock %p incompatible; sending blocking AST.\n", lock);
-
-        OBD_ALLOC(w, sizeof(*w));
-        if (!w) {
-                LBUG();
-                GOTO(out, 0);
-        }
-
-        w->w_data = data;
-        w->w_datalen = datalen;
-        if (new) {
+        if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
                 lock->l_flags |= LDLM_FL_AST_SENT;
                 /* If the enqueuing client said so, tell the AST recipient to
                  * discard dirty data, rather than writing back. */
                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
-                w->w_blocking = 1;
-                ldlm_lock2desc(new, &w->w_desc);
+                LASSERT(list_empty(&lock->l_bl_ast));
+                list_add(&lock->l_bl_ast, work_list);
+                LDLM_LOCK_GET(lock);
+                LASSERT(lock->l_blocking_lock == NULL);
+                lock->l_blocking_lock = LDLM_LOCK_GET(new);
         }
+}
+
+void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
+{
+        if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
+                LDLM_DEBUG(lock, "lock granted; sending completion AST.");
+                lock->l_flags |= LDLM_FL_CP_REQD;
+                LASSERT(list_empty(&lock->l_cp_ast));
+                list_add(&lock->l_cp_ast, work_list);
+                LDLM_LOCK_GET(lock);
+        }
+}
 
-        w->w_lock = LDLM_LOCK_GET(lock);
-        list_add(&w->w_list, lock->l_resource->lr_tmp);
+/* must be called with lr_lock held */
+void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
+                                struct list_head *work_list)
+{
+        ENTRY;
+        check_res_locked(lock->l_resource);
+        if (new)
+                ldlm_add_bl_work_item(lock, new, work_list);
+        else 
+                ldlm_add_cp_work_item(lock, work_list);
         EXIT;
- out:
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 }
 
 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
@@ -436,10 +450,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
         LDLM_LOCK_PUT(lock);
 }
 
-/* only called for local locks */
-void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
+void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
 {
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
         ldlm_lock_remove_from_lru(lock);
         if (mode & (LCK_NL | LCK_CR | LCK_PR))
                 lock->l_readers++;
@@ -448,7 +460,14 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
         lock->l_last_used = jiffies;
         LDLM_LOCK_GET(lock);
         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+}
+
+/* only called for local locks */
+void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
+{
+        lock_res(lock->l_resource);
+        ldlm_lock_addref_internal_nolock(lock, mode);
+        unlock_res(lock->l_resource);
 }
 
 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
@@ -458,7 +477,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
 
         ns = lock->l_resource->lr_namespace;
 
-        l_lock(&ns->ns_lock);
+        lock_res(lock->l_resource);
+
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
                 LASSERT(lock->l_readers > 0);
@@ -489,8 +509,9 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
 
                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
                 ldlm_lock_remove_from_lru(lock);
-                l_unlock(&ns->ns_lock);
-                if (ldlm_bl_to_thread(ns, NULL, lock) != 0)
+                unlock_res(lock->l_resource);
+                if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
+                                ldlm_bl_to_thread(ns, NULL, lock) != 0)
                         ldlm_handle_bl_callback(ns, NULL, lock);
         } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
                    !lock->l_readers && !lock->l_writers) {
@@ -498,12 +519,14 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                  * reference, put it on the LRU. */
                 LASSERT(list_empty(&lock->l_lru));
                 LASSERT(ns->ns_nr_unused >= 0);
+                spin_lock(&ns->ns_unused_lock);
                 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
                 ns->ns_nr_unused++;
-                l_unlock(&ns->ns_lock);
+                spin_unlock(&ns->ns_unused_lock);
+                unlock_res(lock->l_resource);
                 ldlm_cancel_lru(ns, LDLM_ASYNC);
         } else {
-                l_unlock(&ns->ns_lock);
+                unlock_res(lock->l_resource);
         }
 
         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
@@ -529,9 +552,9 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
         LASSERT(lock != NULL);
 
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
         lock->l_flags |= LDLM_FL_CBPENDING;
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
         ldlm_lock_decref_internal(lock, mode);
         LDLM_LOCK_PUT(lock);
 }
@@ -540,24 +563,25 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
  *  - ldlm_lock_enqueue
  *  - ldlm_reprocess_queue
  *  - ldlm_lock_convert
+ *
+ * must be called with lr_lock held
  */
-void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
-                     int run_ast)
+void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
 {
         struct ldlm_resource *res = lock->l_resource;
         ENTRY;
 
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        check_res_locked(res);
+
         lock->l_granted_mode = lock->l_req_mode;
         ldlm_resource_add_lock(res, &res->lr_granted, lock);
 
         if (lock->l_granted_mode < res->lr_most_restr)
                 res->lr_most_restr = lock->l_granted_mode;
 
-        if (run_ast && lock->l_completion_ast != NULL)
-                ldlm_add_ast_work_item(lock, NULL, data, datalen);
+        if (work_list && lock->l_completion_ast != NULL)
+                ldlm_add_ast_work_item(lock, NULL, work_list);
 
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
         EXIT;
 }
 
@@ -621,7 +645,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                 if (flags & LDLM_FL_TEST_LOCK)
                         LDLM_LOCK_GET(lock);
                 else
-                        ldlm_lock_addref_internal(lock, mode);
+                        ldlm_lock_addref_internal_nolock(lock, mode);
                 return lock;
         }
 
@@ -630,10 +654,10 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
 
 void ldlm_lock_allow_match(struct ldlm_lock *lock)
 {
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
         lock->l_flags |= LDLM_FL_CAN_MATCH;
         wake_up(&lock->l_waitq);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
 }
 
 /* Can be called in two ways:
@@ -682,7 +706,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
                 RETURN(0);
         }
 
-        l_lock(&ns->ns_lock);
+        lock_res(res);
 
         lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
         if (lock != NULL)
@@ -698,8 +722,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 
         EXIT;
  out:
+        unlock_res(res);
         ldlm_resource_putref(res);
-        l_unlock(&ns->ns_lock);
 
         if (lock) {
                 ldlm_lock2handle(lock, lockh);
@@ -725,13 +749,11 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 
 out2:
         if (rc) {
-                l_lock(&ns->ns_lock);
                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
                            type == LDLM_PLAIN ? res_id->name[2] :
                                 policy->l_extent.start,
                            type == LDLM_PLAIN ? res_id->name[3] :
                            policy->l_extent.end);
-                l_unlock(&ns->ns_lock);
         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/* less verbose for test-only */
                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
@@ -837,7 +859,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                 LASSERT(rc == ELDLM_OK);
         }
 
-        l_lock(&ns->ns_lock);
+        lock_res(lock->l_resource);
         if (local && lock->l_req_mode == lock->l_granted_mode) {
                 /* The server returned a blocked lock, but it was granted before
                  * we got a chance to actually enqueue it.  We don't need to do
@@ -869,7 +891,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                 else
-                        ldlm_grant_lock(lock, NULL, 0, 0);
+                        ldlm_grant_lock(lock, NULL);
                 GOTO(out, ELDLM_OK);
         } else if (*flags & LDLM_FL_REPLAY) {
                 if (*flags & LDLM_FL_BLOCK_CONV) {
@@ -879,22 +901,23 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                         GOTO(out, ELDLM_OK);
                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
-                        ldlm_grant_lock(lock, NULL, 0, 0);
+                        ldlm_grant_lock(lock, NULL);
                         GOTO(out, ELDLM_OK);
                 }
                 /* If no flags, fall through to normal enqueue path. */
         }
 
         policy = ldlm_processing_policy_table[res->lr_type];
-        policy(lock, flags, 1, &rc);
+        policy(lock, flags, 1, &rc, NULL);
         EXIT;
 out:
-        l_unlock(&ns->ns_lock);
+        unlock_res(lock->l_resource);
         return rc;
 }
 
 /* Must be called with namespace taken: queue is waiting or converting. */
-int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue)
+int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
+                         struct list_head *work_list)
 {
         struct list_head *tmp, *pos;
         ldlm_processing_policy policy;
@@ -903,6 +926,8 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue)
         ldlm_error_t err;
         ENTRY;
 
+        check_res_locked(res);
+
         policy = ldlm_processing_policy_table[res->lr_type];
         LASSERT(policy);
 
@@ -913,7 +938,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue)
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
                 flags = 0;
-                rc = policy(pending, &flags, 0, &err);
+                rc = policy(pending, &flags, 0, &err, work_list);
                 if (rc != LDLM_ITER_CONTINUE)
                         break;
         }
@@ -921,49 +946,80 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue)
         RETURN(rc);
 }
 
-int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list)
+int ldlm_run_bl_ast_work(struct list_head *rpc_list)
 {
         struct list_head *tmp, *pos;
-        int rc, retval = 0;
+        struct ldlm_lock_desc d;
+        int rc = 0, retval = 0;
         ENTRY;
 
-        l_check_no_ns_lock(ns);
+        list_for_each_safe(tmp, pos, rpc_list) {
+                struct ldlm_lock *lock =
+                        list_entry(tmp, struct ldlm_lock, l_bl_ast);
+
+                /* nobody should touch l_bl_ast */
+                lock_res(lock->l_resource);
+                list_del_init(&lock->l_bl_ast);
+
+                LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+                LASSERT(lock->l_bl_ast_run == 0);
+                LASSERT(lock->l_blocking_lock);
+                lock->l_bl_ast_run++;
+                unlock_res(lock->l_resource);
+
+                ldlm_lock2desc(lock->l_blocking_lock, &d);
+
+                LDLM_LOCK_PUT(lock->l_blocking_lock);
+                lock->l_blocking_lock = NULL;
+                rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING);
+
+                if (rc == -ERESTART)
+                        retval = rc;
+                else if (rc)
+                        CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
+                               "disconnect client\n");
+                LDLM_LOCK_PUT(lock);
+        }
+        RETURN(retval);
+}
+
+int ldlm_run_cp_ast_work(struct list_head *rpc_list)
+{
+        struct list_head *tmp, *pos;
+        int rc = 0, retval = 0;
+        ENTRY;
+
+        /* It's possible to receive a completion AST before we've set
+         * the l_completion_ast pointer: either because the AST arrived
+         * before the reply, or simply because there's a small race
+         * window between receiving the reply and finishing the local
+         * enqueue. (bug 842)
+         *
+         * This can't happen with the blocking_ast, however, because we
+         * will never call the local blocking_ast until we drop our
+         * reader/writer reference, which we won't do until we get the
+         * reply and finish enqueueing. */
 
         list_for_each_safe(tmp, pos, rpc_list) {
-                struct ldlm_ast_work *w =
-                        list_entry(tmp, struct ldlm_ast_work, w_list);
-
-                /* It's possible to receive a completion AST before we've set
-                 * the l_completion_ast pointer: either because the AST arrived
-                 * before the reply, or simply because there's a small race
-                 * window between receiving the reply and finishing the local
-                 * enqueue. (bug 842)
-                 *
-                 * This can't happen with the blocking_ast, however, because we
-                 * will never call the local blocking_ast until we drop our
-                 * reader/writer reference, which we won't do until we get the
-                 * reply and finish enqueueing. */
-                LASSERT(w->w_lock != NULL);
-                if (w->w_blocking) {
-                        LASSERT(w->w_lock->l_blocking_ast != NULL);
-                        rc = w->w_lock->l_blocking_ast
-                                (w->w_lock, &w->w_desc, w->w_data,
-                                 LDLM_CB_BLOCKING);
-                } else if (w->w_lock->l_completion_ast != NULL) {
-                        LASSERT(w->w_lock->l_completion_ast != NULL);
-                        rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags,
-                                                         w->w_data);
-                } else {
-                        rc = 0;
-                }
+                struct ldlm_lock *lock =
+                        list_entry(tmp, struct ldlm_lock, l_cp_ast);
+
+                /* nobody should touch l_cp_ast */
+                lock_res(lock->l_resource);
+                list_del_init(&lock->l_cp_ast);
+                LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
+                lock->l_flags &= ~LDLM_FL_CP_REQD;
+                unlock_res(lock->l_resource);
+
+                if (lock->l_completion_ast != NULL)
+                        rc = lock->l_completion_ast(lock, 0, 0);
+
                 if (rc == -ERESTART)
                         retval = rc;
                 else if (rc)
                         CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
                                "disconnect client\n");
-                LDLM_LOCK_PUT(w->w_lock);
-                list_del(&w->w_list);
-                OBD_FREE(w, sizeof(*w));
+                LDLM_LOCK_PUT(lock);
         }
         RETURN(retval);
 }
@@ -976,27 +1032,31 @@ static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
 
 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
 {
+        struct list_head *tmp;
         int i, rc;
 
-        l_lock(&ns->ns_lock);
+        spin_lock(&ns->ns_hash_lock);
         for (i = 0; i < RES_HASH_SIZE; i++) {
-                struct list_head *tmp, *next;
-                list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
+                tmp = ns->ns_hash[i].next;
+                while (tmp != &(ns->ns_hash[i])) {
                         struct ldlm_resource *res =
                                 list_entry(tmp, struct ldlm_resource, lr_hash);
 
                         ldlm_resource_getref(res);
-                        l_unlock(&ns->ns_lock);
+                        spin_unlock(&ns->ns_hash_lock);
+
                         rc = reprocess_one_queue(res, NULL);
-                        l_lock(&ns->ns_lock);
-                        next = tmp->next;
-                        ldlm_resource_putref(res);
+
+                        spin_lock(&ns->ns_hash_lock);
+                        tmp = tmp->next;
+                        ldlm_resource_putref_locked(res);
+
                         if (rc == LDLM_ITER_STOP)
                                 GOTO(out, rc);
                 }
         }
  out:
-        l_unlock(&ns->ns_lock);
+        spin_unlock(&ns->ns_hash_lock);
         EXIT;
 }
 
@@ -1013,17 +1073,13 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
         }
 
  restart:
-        l_lock(&res->lr_namespace->ns_lock);
-        res->lr_tmp = &rpc_list;
-
-        rc = ldlm_reprocess_queue(res, &res->lr_converting);
+        lock_res(res);
+        rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
         if (rc == LDLM_ITER_CONTINUE)
-                ldlm_reprocess_queue(res, &res->lr_waiting);
-
-        res->lr_tmp = NULL;
-        l_unlock(&res->lr_namespace->ns_lock);
+                ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
+        unlock_res(res);
 
-        rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
+        rc = ldlm_run_cp_ast_work(&rpc_list);
         if (rc == -ERESTART) {
                 LASSERT(list_empty(&rpc_list));
                 goto restart;
@@ -1033,20 +1089,19 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
 
 void ldlm_cancel_callback(struct ldlm_lock *lock)
 {
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        check_res_locked(lock->l_resource);
+
         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
                 lock->l_flags |= LDLM_FL_CANCEL;
                 if (lock->l_blocking_ast) {
-                        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                        // l_check_no_ns_lock(lock->l_resource->lr_namespace);
+                        unlock_res(lock->l_resource);
                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
                                              LDLM_CB_CANCELING);
-                        return;
+                        lock_res(lock->l_resource);
                 } else {
                         LDLM_DEBUG(lock, "no blocking ast");
                 }
         }
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 }
 
 void ldlm_lock_cancel(struct ldlm_lock *lock)
@@ -1058,9 +1113,9 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         res = lock->l_resource;
         ns = res->lr_namespace;
 
-        l_lock(&ns->ns_lock);
         ldlm_del_waiting_lock(lock);
-
+        lock_res(res);
+        
         /* Please do not, no matter how tempting, remove this LBUG without
          * talking to me first. -phik */
         if (lock->l_readers || lock->l_writers) {
@@ -1071,8 +1126,10 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         ldlm_cancel_callback(lock);
 
         ldlm_resource_unlink_lock(lock);
+        unlock_res(res);
+        
         ldlm_lock_destroy(lock);
-        l_unlock(&ns->ns_lock);
+
         EXIT;
 }
 
@@ -1091,23 +1148,26 @@ int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
 
 void ldlm_cancel_locks_for_export(struct obd_export *exp)
 {
-        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         struct ldlm_lock *lock;
         struct ldlm_resource *res;
 
-        l_lock(&ns->ns_lock);
+        spin_lock(&exp->exp_ldlm_data.led_lock);
         while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) { 
                 lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
                                   struct ldlm_lock, l_export_chain);
                 res = ldlm_resource_getref(lock->l_resource);
+                LDLM_LOCK_GET(lock);
+                spin_unlock(&exp->exp_ldlm_data.led_lock);
+
                 LDLM_DEBUG(lock, "export %p", exp);
                 ldlm_lock_cancel(lock);
-                l_unlock(&ns->ns_lock);
                 ldlm_reprocess_all(res);
+
                 ldlm_resource_putref(res);
-                l_lock(&ns->ns_lock);
+                LDLM_LOCK_PUT(lock);
+                spin_lock(&exp->exp_ldlm_data.led_lock);
         }
-        l_unlock(&ns->ns_lock);
+        spin_unlock(&exp->exp_ldlm_data.led_lock);
 }
 
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
@@ -1132,7 +1192,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
         res = lock->l_resource;
         ns = res->lr_namespace;
 
-        l_lock(&ns->ns_lock);
+        lock_res(res);
 
         old_mode = lock->l_req_mode;
         lock->l_req_mode = new_mode;
@@ -1149,9 +1209,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                    *flags);
                         LBUG();
 
-                        res->lr_tmp = &rpc_list;
-                        ldlm_grant_lock(lock, NULL, 0, 0);
-                        res->lr_tmp = NULL;
+                        ldlm_grant_lock(lock, &rpc_list);
                         granted = 1;
                         /* FIXME: completion handling not with ns_lock held ! */
                         if (lock->l_completion_ast)
@@ -1161,9 +1219,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                 int pflags = 0;
                 ldlm_processing_policy policy;
                 policy = ldlm_processing_policy_table[res->lr_type];
-                res->lr_tmp = &rpc_list;
-                rc = policy(lock, &pflags, 0, &err);
-                res->lr_tmp = NULL;
+                rc = policy(lock, &pflags, 0, &err, &rpc_list);
                 if (rc == LDLM_ITER_STOP) {
                         lock->l_req_mode = old_mode;
                         ldlm_resource_add_lock(res, &res->lr_granted, lock);
@@ -1173,11 +1229,10 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                         granted = 1;
                 }
         }
-
-        l_unlock(&ns->ns_lock);
+        unlock_res(lock->l_resource);
 
         if (granted)
-                ldlm_run_ast_work(ns, &rpc_list);
+                ldlm_run_cp_ast_work(&rpc_list);
         RETURN(res);
 }
 
index 340891d..a7275d0 100644 (file)
@@ -42,7 +42,6 @@
 
 extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
-extern struct lustre_lock ldlm_handle_lock;
 extern struct list_head ldlm_namespace_list;
 
 static DECLARE_MUTEX(ldlm_ref_sem);
@@ -361,9 +360,7 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
                         ldlm_lock_cancel(lock);
                         rc = -ERESTART;
                 } else {
-                        l_lock(&lock->l_resource->lr_namespace->ns_lock);
                         ldlm_del_waiting_lock(lock);
-                        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                         ldlm_failed_ast(lock, rc, ast_type);
                 }
         } else if (rc) {
@@ -405,21 +402,6 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 
         LASSERT(lock);
 
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        if (lock->l_granted_mode != lock->l_req_mode) {
-                /* this blocking AST will be communicated as part of the
-                 * completion AST instead */
-                LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                RETURN(0);
-        }
-
-        if (lock->l_destroyed) {
-                /* What's the point? */
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                RETURN(0);
-        }
-
 #if 0
         if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
                 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
@@ -431,9 +413,24 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK,
                               1, &size, NULL);
-        if (req == NULL) {
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        if (req == NULL)
                 RETURN(-ENOMEM);
+
+        lock_res(lock->l_resource);
+        if (lock->l_granted_mode != lock->l_req_mode) {
+                /* this blocking AST will be communicated as part of the
+                 * completion AST instead */
+                unlock_res(lock->l_resource);
+                LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
+                ptlrpc_req_finished(req);
+                RETURN(0);
+        }
+
+        if (lock->l_destroyed) {
+                /* What's the point? */
+                unlock_res(lock->l_resource);
+                ptlrpc_req_finished(req);
+                RETURN(0);
         }
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
@@ -447,7 +444,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 
         if (lock->l_granted_mode == lock->l_req_mode)
                 ldlm_add_waiting_lock(lock);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
 
         req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
@@ -484,12 +481,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (total_enqueue_wait / 1000000 > obd_timeout)
                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
 
-        down(&lock->l_resource->lr_lvb_sem);
+        lock_res(lock->l_resource);
         if (lock->l_resource->lr_lvb_len) {
                 buffers = 2;
                 size[1] = lock->l_resource->lr_lvb_len;
         }
-        up(&lock->l_resource->lr_lvb_sem);
+        unlock_res(lock->l_resource);
         
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK,
@@ -506,13 +503,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (buffers == 2) {
                 void *lvb;
                 
-                down(&lock->l_resource->lr_lvb_sem);
                 lvb = lustre_msg_buf(req->rq_reqmsg, 1,
                                      lock->l_resource->lr_lvb_len);
-
+                lock_res(lock->l_resource);
                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                        lock->l_resource->lr_lvb_len);
-                up(&lock->l_resource->lr_lvb_sem);
+                unlock_res(lock->l_resource);
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
@@ -523,12 +519,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
 
         /* We only send real blocking ASTs after the lock is granted */
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
         if (lock->l_flags & LDLM_FL_AST_SENT) {
                 body->lock_flags |= LDLM_FL_AST_SENT;
                 ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
         }
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
 
         rc = ptlrpc_queue_wait(req);
         if (rc != 0)
@@ -560,9 +556,9 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
                sizeof(body->lock_handle1));
         ldlm_lock2desc(lock, &body->lock_desc);
 
-       down(&lock->l_resource->lr_lvb_sem);
+       lock_res(lock->l_resource);
         size = lock->l_resource->lr_lvb_len;
-       up(&lock->l_resource->lr_lvb_sem);
+       unlock_res(lock->l_resource);
         req->rq_replen = lustre_msg_size(1, &size);
 
         req->rq_send_state = LUSTRE_IMP_FULL;
@@ -583,20 +579,19 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 static struct ldlm_lock *
 find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl)
 {
-        struct obd_device *obd = exp->exp_obd;
         struct list_head *iter;
 
-        l_lock(&obd->obd_namespace->ns_lock);
+        spin_lock(&exp->exp_ldlm_data.led_lock);
         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
                 struct ldlm_lock *lock;
                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
                 if (lock->l_remote_handle.cookie == remote_hdl->cookie) {
                         LDLM_LOCK_GET(lock);
-                        l_unlock(&obd->obd_namespace->ns_lock);
+                        spin_unlock(&exp->exp_ldlm_data.led_lock);
                         return lock;
                 }
         }
-        l_unlock(&obd->obd_namespace->ns_lock);
+        spin_unlock(&exp->exp_ldlm_data.led_lock);
         return NULL;
 }
 
@@ -657,17 +652,16 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
         LASSERT(req->rq_export);
         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
         if (req->rq_export->exp_failed) {
                 LDLM_ERROR(lock,"lock on destroyed export %p\n",req->rq_export);
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 GOTO(out, err = -ENOTCONN);
         }
-        lock->l_export = class_export_get(req->rq_export);
 
+        lock->l_export = class_export_get(req->rq_export);
+        spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
         list_add(&lock->l_export_chain,
                  &lock->l_export->exp_ldlm_data.led_held_locks);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
 
 existing_lock:
 
@@ -677,12 +671,12 @@ existing_lock:
                 cookie = req;
         } else {
                 int buffers = 1;
-                down(&lock->l_resource->lr_lvb_sem);
+                lock_res(lock->l_resource);
                 if (lock->l_resource->lr_lvb_len) {
                         size[1] = lock->l_resource->lr_lvb_len;
                         buffers = 2;
                 }
-                up(&lock->l_resource->lr_lvb_sem);
+                unlock_res(lock->l_resource);
                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                         GOTO(out, rc = -ENOMEM);
 
@@ -711,13 +705,13 @@ existing_lock:
 
         /* We never send a blocking AST until the lock is granted, but
          * we can tell it right now */
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
         if (lock->l_flags & LDLM_FL_AST_SENT) {
                 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
                 if (lock->l_granted_mode == lock->l_req_mode)
                         ldlm_add_waiting_lock(lock);
         }
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
 
         EXIT;
  out:
@@ -732,13 +726,11 @@ existing_lock:
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
         if (lock) {
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
                            "(err=%d, rc=%d)", err, rc);
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
                 if (rc == 0) {
-                        down(&lock->l_resource->lr_lvb_sem);
+                        lock_res(lock->l_resource);
                         size[1] = lock->l_resource->lr_lvb_len;
                         if (size[1] > 0) {
                                 void *lvb = lustre_msg_buf(req->rq_repmsg,
@@ -749,7 +741,7 @@ existing_lock:
                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                                        size[1]);
                         }
-                        up(&lock->l_resource->lr_lvb_sem);
+                        unlock_res(lock->l_resource);
                 } else {
                         ldlm_lock_destroy(lock);
                 }
@@ -793,18 +785,14 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
         } else {
                 void *res = NULL;
 
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 LDLM_DEBUG(lock, "server-side convert handler START");
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
                 do_gettimeofday(&lock->l_enqueued_time);
                 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
                                         (int *)&dlm_rep->lock_flags);
                 if (res) {
-                        l_lock(&lock->l_resource->lr_namespace->ns_lock);
                         if (ldlm_del_waiting_lock(lock))
-                                CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
-                        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                                LDLM_DEBUG(lock, "converted waiting lock");
                         req->rq_status = 0;
                 } else {
                         req->rq_status = EDEADLOCK;
@@ -814,9 +802,7 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
         if (lock) {
                 if (!req->rq_status)
                         ldlm_reprocess_all(lock->l_resource);
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 LDLM_DEBUG(lock, "server-side convert handler END");
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 LDLM_LOCK_PUT(lock);
         } else
                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
@@ -866,11 +852,9 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                                 //(res, req->rq_reqmsg, 1);
                 }
 
-                l_lock(&res->lr_namespace->ns_lock);
                 ldlm_lock_cancel(lock);
                 if (ldlm_del_waiting_lock(lock))
                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
-                l_unlock(&res->lr_namespace->ns_lock);
                 req->rq_status = rc;
         }
 
@@ -879,9 +863,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
 
         if (lock) {
                 ldlm_reprocess_all(lock->l_resource);
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 LDLM_DEBUG(lock, "server-side cancel handler END");
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 LDLM_LOCK_PUT(lock);
         }
 
@@ -894,29 +876,25 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
         int do_ast;
         ENTRY;
 
-        l_lock(&ns->ns_lock);
         LDLM_DEBUG(lock, "client blocking AST callback handler START");
-
+        
+        lock_res(lock->l_resource);
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
+        unlock_res(lock->l_resource);
 
         if (do_ast) {
                 LDLM_DEBUG(lock, "already unused, calling "
                            "callback (%p)", lock->l_blocking_ast);
-                if (lock->l_blocking_ast != NULL) {
-                        l_unlock(&ns->ns_lock);
-                        l_check_no_ns_lock(ns);
+                if (lock->l_blocking_ast != NULL)
                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
                                              LDLM_CB_BLOCKING);
-                        l_lock(&ns->ns_lock);
-                }
         } else {
                 LDLM_DEBUG(lock, "Lock still has references, will be"
                            " cancelled later");
         }
 
         LDLM_DEBUG(lock, "client blocking callback handler END");
-        l_unlock(&ns->ns_lock);
         LDLM_LOCK_PUT(lock);
         EXIT;
 }
@@ -926,12 +904,14 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                                     struct ldlm_request *dlm_req,
                                     struct ldlm_lock *lock)
 {
+        struct ldlm_resource *res = lock->l_resource;
         LIST_HEAD(ast_list);
         ENTRY;
 
-        l_lock(&ns->ns_lock);
         LDLM_DEBUG(lock, "client completion callback handler START");
 
+        lock_res(res);
+
         /* If we receive the completion AST before the actual enqueue returned,
          * then we might need to switch lock modes, resources, or extents. */
         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
@@ -949,9 +929,11 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
                    &lock->l_resource->lr_name,
                    sizeof(lock->l_resource->lr_name)) != 0) {
+                unlock_res(res);
                 ldlm_lock_change_resource(ns, lock,
                                          dlm_req->lock_desc.l_resource.lr_name);
                 LDLM_DEBUG(lock, "completion AST, new resource");
+                lock_res(res);
         }
 
         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
@@ -971,14 +953,13 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 }
         }
 
-        lock->l_resource->lr_tmp = &ast_list;
-        ldlm_grant_lock(lock, req, sizeof(*req), 1);
-        lock->l_resource->lr_tmp = NULL;
+        ldlm_grant_lock(lock, &ast_list);
+        unlock_res(res);
+
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
-        l_unlock(&ns->ns_lock);
         LDLM_LOCK_PUT(lock);
 
-        ldlm_run_ast_work(ns, &ast_list);
+        ldlm_run_cp_ast_work(&ast_list);
 
         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
                           lock);
@@ -993,15 +974,10 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
         int rc = -ENOSYS;
         ENTRY;
 
-        l_lock(&ns->ns_lock);
         LDLM_DEBUG(lock, "client glimpse AST callback handler");
 
-        if (lock->l_glimpse_ast != NULL) {
-                l_unlock(&ns->ns_lock);
-                l_check_no_ns_lock(ns);
+        if (lock->l_glimpse_ast != NULL)
                 rc = lock->l_glimpse_ast(lock, req);
-                l_lock(&ns->ns_lock);
-        }
 
         if (req->rq_repmsg != NULL) {
                 ptlrpc_reply(req);
@@ -1010,16 +986,18 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
                 ptlrpc_error(req);
         }
 
-        l_unlock(&ns->ns_lock);
+        lock_res(lock->l_resource);
         if (lock->l_granted_mode == LCK_PW &&
             !lock->l_readers && !lock->l_writers &&
             time_after(jiffies, lock->l_last_used + 10 * HZ)) {
+                unlock_res(lock->l_resource);
                 if (ldlm_bl_to_thread(ns, NULL, lock))
                         ldlm_handle_bl_callback(ns, NULL, lock);
 
                 EXIT;
                 return;
         }
+        unlock_res(lock->l_resource);
         LDLM_LOCK_PUT(lock);
         EXIT;
 }
@@ -1575,8 +1553,6 @@ int __init ldlm_init(void)
                 return -ENOMEM;
         }
 
-        l_lock_init(&ldlm_handle_lock);
-
         return 0;
 }
 
@@ -1661,10 +1637,6 @@ EXPORT_SYMBOL(ldlm_dump_all_namespaces);
 EXPORT_SYMBOL(ldlm_resource_get);
 EXPORT_SYMBOL(ldlm_resource_putref);
 
-/* l_lock.c */
-EXPORT_SYMBOL(l_lock);
-EXPORT_SYMBOL(l_unlock);
-
 /* ldlm_lib.c */
 EXPORT_SYMBOL(client_import_add_conn);
 EXPORT_SYMBOL(client_import_del_conn);
index 9a693e3..f8c10fd 100644 (file)
@@ -35,7 +35,7 @@
 
 static inline int
 ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                        int send_cbs)
+                        struct list_head *work_list)
 {
         struct list_head *tmp;
         struct ldlm_lock *lock;
@@ -54,12 +54,12 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                 if (lockmode_compat(lock->l_req_mode, req_mode))
                         continue;
 
-                if (!send_cbs)
+                if (!work_list)
                         RETURN(0);
 
                 compat = 0;
                 if (lock->l_blocking_ast)
-                        ldlm_add_ast_work_item(lock, req, NULL, 0);
+                        ldlm_add_ast_work_item(lock, req, work_list);
         }
 
         RETURN(compat);
@@ -75,7 +75,7 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req,
  *   - the caller has NOT initialized req->lr_tmp, so we must
  *   - must call this function with the ns lock held once */
 int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
-                            ldlm_error_t *err)
+                            ldlm_error_t *err, struct list_head *work_list)
 {
         struct ldlm_resource *res = lock->l_resource;
         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
@@ -85,25 +85,22 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
         LASSERT(list_empty(&res->lr_converting));
 
         if (!first_enq) {
-                LASSERT(res->lr_tmp != NULL);
-                rc = ldlm_plain_compat_queue(&res->lr_granted, lock, 0);
+                LASSERT(work_list != NULL);
+                rc = ldlm_plain_compat_queue(&res->lr_granted, lock, NULL);
                 if (!rc)
                         RETURN(LDLM_ITER_STOP);
-                rc = ldlm_plain_compat_queue(&res->lr_waiting, lock, 0);
+                rc = ldlm_plain_compat_queue(&res->lr_waiting, lock, NULL);
                 if (!rc)
                         RETURN(LDLM_ITER_STOP);
 
                 ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL, 0, 1);
+                ldlm_grant_lock(lock, work_list);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
  restart:
-        LASSERT(res->lr_tmp == NULL);
-        res->lr_tmp = &rpc_list;
-        rc = ldlm_plain_compat_queue(&res->lr_granted, lock, 1);
-        rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, 1);
-        res->lr_tmp = NULL;
+        rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list);
+        rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list);
 
         if (rc != 2) {
                 /* If either of the compat_queue()s returned 0, then we
@@ -114,15 +111,15 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
                 if (list_empty(&lock->l_res_link))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
-                l_unlock(&res->lr_namespace->ns_lock);
-                rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
-                l_lock(&res->lr_namespace->ns_lock);
+                unlock_res(res);
+                rc = ldlm_run_bl_ast_work(&rpc_list);
+                lock_res(res);
                 if (rc == -ERESTART)
                         GOTO(restart, -ERESTART);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
         } else {
                 ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL, 0, 0);
+                ldlm_grant_lock(lock, NULL);
         }
         RETURN(0);
 }
index 3bc90b0..2a9d8a8 100644 (file)
@@ -177,6 +177,8 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         ldlm_lock_addref_internal(lock, mode);
         ldlm_lock2handle(lock, lockh);
         lock->l_flags |= LDLM_FL_LOCAL;
+        if (*flags & LDLM_FL_ATOMIC_CB)
+                lock->l_flags |= LDLM_FL_ATOMIC_CB;
         lock->l_lvb_swabber = lvb_swabber;
         if (policy != NULL)
                 memcpy(&lock->l_policy_data, policy, sizeof(*policy));
@@ -212,10 +214,10 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
                                 struct lustre_handle *lockh, int mode)
 {
         /* Set a flag to prevent us from sending a CANCEL (bug 407) */
-        l_lock(&ns->ns_lock);
+        lock_res(lock->l_resource);
         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+        unlock_res(lock->l_resource);
         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
-        l_unlock(&ns->ns_lock);
 
         ldlm_lock_decref_and_cancel(lockh, mode);
 
@@ -400,9 +402,9 @@ int ldlm_cli_enqueue(struct obd_export *exp,
         }
 
         if ((*flags) & LDLM_FL_AST_SENT) {
-                l_lock(&ns->ns_lock);
+                lock_res(lock->l_resource);
                 lock->l_flags |= LDLM_FL_CBPENDING;
-                l_unlock(&ns->ns_lock);
+                unlock_res(lock->l_resource);
                 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
         }
 
@@ -571,11 +573,11 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
 
                 LDLM_DEBUG(lock, "client-side cancel");
                 /* Set this flag to prevent others from getting new references*/
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+                lock_res(lock->l_resource);
                 lock->l_flags |= LDLM_FL_CBPENDING;
                 local_only = lock->l_flags & LDLM_FL_LOCAL_ONLY;
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 ldlm_cancel_callback(lock);
+                unlock_res(lock->l_resource);
 
                 if (local_only) {
                         CDEBUG(D_INFO, "not sending request (at caller's "
@@ -658,17 +660,25 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
         sync = LDLM_SYNC; /* force to be sync in user space */
 #endif
 
-        l_lock(&ns->ns_lock);
+        spin_lock(&ns->ns_unused_lock);
         count = ns->ns_nr_unused - ns->ns_max_unused;
 
         if (count <= 0) {
-                l_unlock(&ns->ns_lock);
+                spin_unlock(&ns->ns_unused_lock);
                 RETURN(0);
         }
 
-        list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru) {
+        while (!list_empty(&ns->ns_unused_list)) {
+                struct list_head *tmp = ns->ns_unused_list.next;
+                lock = list_entry(tmp, struct ldlm_lock, l_lru);
                 LASSERT(!lock->l_readers && !lock->l_writers);
 
+                LDLM_LOCK_GET(lock); /* dropped by bl thread */
+                spin_unlock(&ns->ns_unused_lock);
+
+                lock_res(lock->l_resource);
+                ldlm_lock_remove_from_lru(lock);
+
                 /* Setting the CBPENDING flag is a little misleading, but
                  * prevents an important race; namely, once CBPENDING is set,
                  * the lock can accumulate no more readers/writers.  Since
@@ -676,9 +686,6 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
                  * won't see this flag and call l_blocking_ast */
                 lock->l_flags |= LDLM_FL_CBPENDING;
 
-                LDLM_LOCK_GET(lock); /* dropped by bl thread */
-                ldlm_lock_remove_from_lru(lock);
-
                 /* We can't re-add to l_lru as it confuses the refcounting in
                  * ldlm_lock_remove_from_lru() if an AST arrives after we drop
                  * ns_lock below. We use l_tmp and can't use l_pending_chain as
@@ -687,10 +694,14 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
                 if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock))                        
                         list_add(&lock->l_tmp, &cblist);
 
+                unlock_res(lock->l_resource);
+
+                spin_lock(&ns->ns_unused_lock);
+
                 if (--count == 0)
                         break;
         }
-        l_unlock(&ns->ns_lock);
+        spin_unlock(&ns->ns_unused_lock);
 
         list_for_each_entry_safe(lock, next, &cblist, l_tmp) {
                 list_del_init(&lock->l_tmp);
@@ -704,9 +715,9 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                                            struct ldlm_res_id res_id, int flags,
                                            void *opaque)
 {
-        struct ldlm_resource *res;
         struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
-        struct ldlm_ast_work *w;
+        struct ldlm_resource *res;
+        struct ldlm_lock *lock;
         ENTRY;
 
         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
@@ -716,9 +727,8 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                 RETURN(0);
         }
 
-        l_lock(&ns->ns_lock);
+        lock_res(res);
         list_for_each(tmp, &res->lr_granted) {
-                struct ldlm_lock *lock;
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (opaque != NULL && lock->l_ast_data != opaque) {
@@ -738,31 +748,27 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                 /* See CBPENDING comment in ldlm_cancel_lru */
                 lock->l_flags |= LDLM_FL_CBPENDING;
 
-                OBD_ALLOC(w, sizeof(*w));
-                LASSERT(w);
-
-                w->w_lock = LDLM_LOCK_GET(lock);
-
-                list_add(&w->w_list, &list);
+                LASSERT(list_empty(&lock->l_bl_ast));
+                list_add(&lock->l_bl_ast, &list);
+                LDLM_LOCK_GET(lock);
         }
-        l_unlock(&ns->ns_lock);
+        unlock_res(res);
 
         list_for_each_safe(tmp, next, &list) {
                 struct lustre_handle lockh;
                 int rc;
-                w = list_entry(tmp, struct ldlm_ast_work, w_list);
+                lock = list_entry(tmp, struct ldlm_lock, l_bl_ast);
 
                 if (flags & LDLM_FL_LOCAL_ONLY) {
-                        ldlm_lock_cancel(w->w_lock);
+                        ldlm_lock_cancel(lock);
                 } else {
-                        ldlm_lock2handle(w->w_lock, &lockh);
+                        ldlm_lock2handle(lock, &lockh);
                         rc = ldlm_cli_cancel(&lockh);
                         if (rc != ELDLM_OK)
                                 CERROR("ldlm_cli_cancel: %d\n", rc);
                 }
-                list_del(&w->w_list);
-                LDLM_LOCK_PUT(w->w_lock);
-                OBD_FREE(w, sizeof(*w));
+                list_del_init(&lock->l_bl_ast);
+                LDLM_LOCK_PUT(lock);
         }
 
         ldlm_resource_putref(res);
@@ -774,10 +780,10 @@ static inline int have_no_nsresource(struct ldlm_namespace *ns)
 {
         int no_resource = 0;
 
-        spin_lock(&ns->ns_counter_lock);
+        spin_lock(&ns->ns_hash_lock);
         if (ns->ns_resources == 0)
                 no_resource = 1;
-        spin_unlock(&ns->ns_counter_lock);
+        spin_unlock(&ns->ns_hash_lock);
 
         RETURN(no_resource);
 }
@@ -805,15 +811,17 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                 RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
                                                        opaque));
 
-        l_lock(&ns->ns_lock);
+        spin_lock(&ns->ns_hash_lock);
         for (i = 0; i < RES_HASH_SIZE; i++) {
-                struct list_head *tmp, *next;
-                list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
-                        int rc;
+                struct list_head *tmp;
+                tmp = ns->ns_hash[i].next;
+                while (tmp != &(ns->ns_hash[i])) {
                         struct ldlm_resource *res;
+                        int rc;
+
                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
                         ldlm_resource_getref(res);
-                        l_unlock(&ns->ns_lock);
+                        spin_unlock(&ns->ns_hash_lock);
 
                         rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
                                                              flags, opaque);
@@ -821,12 +829,13 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                                 CERROR("cancel_unused_res ("LPU64"): %d\n",
                                        res->lr_name.name[0], rc);
 
-                        l_lock(&ns->ns_lock);
-                        next = tmp->next;
-                        ldlm_resource_putref(res);
+                        spin_lock(&ns->ns_hash_lock);
+                        tmp = tmp->next;
+                        ldlm_resource_putref_locked(res);
                 }
         }
-        l_unlock(&ns->ns_lock);
+        spin_unlock(&ns->ns_hash_lock);
+
         if (flags & LDLM_FL_CONFIG_CHANGE)
                 l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi);
 
@@ -841,14 +850,13 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
         struct list_head *tmp, *next;
         struct ldlm_lock *lock;
         int rc = LDLM_ITER_CONTINUE;
-        struct ldlm_namespace *ns = res->lr_namespace;
 
         ENTRY;
 
         if (!res)
                 RETURN(LDLM_ITER_CONTINUE);
 
-        l_lock(&ns->ns_lock);
+        lock_res(res);
         list_for_each_safe(tmp, next, &res->lr_granted) {
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
@@ -870,7 +878,7 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
                         GOTO(out, rc = LDLM_ITER_STOP);
         }
  out:
-        l_unlock(&ns->ns_lock);
+        unlock_res(res);
         RETURN(rc);
 }
 
@@ -901,23 +909,28 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
                                ldlm_res_iterator_t iter, void *closure)
 {
         int i, rc = LDLM_ITER_CONTINUE;
+        struct ldlm_resource *res;
+        struct list_head *tmp;
 
-        l_lock(&ns->ns_lock);
+        spin_lock(&ns->ns_hash_lock);
         for (i = 0; i < RES_HASH_SIZE; i++) {
-                struct list_head *tmp, *next;
-                list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
-                        struct ldlm_resource *res =
-                                list_entry(tmp, struct ldlm_resource, lr_hash);
-
+                tmp = ns->ns_hash[i].next;
+                while (tmp != &(ns->ns_hash[i])) {
+                        res = list_entry(tmp, struct ldlm_resource, lr_hash);
                         ldlm_resource_getref(res);
+                        spin_unlock(&ns->ns_hash_lock);
+
                         rc = iter(res, closure);
-                        ldlm_resource_putref(res);
+
+                        spin_lock(&ns->ns_hash_lock);
+                        tmp = tmp->next;
+                        ldlm_resource_putref_locked(res);
                         if (rc == LDLM_ITER_STOP)
                                 GOTO(out, rc);
                 }
         }
  out:
-        l_unlock(&ns->ns_lock);
+        spin_unlock(&ns->ns_hash_lock);
         RETURN(rc);
 }
 
@@ -941,9 +954,7 @@ void ldlm_change_cbdata(struct ldlm_namespace *ns,
                 return;
         }
 
-        l_lock(&ns->ns_lock);
         ldlm_resource_foreach(res, iter, data);
-        l_unlock(&ns->ns_lock);
         ldlm_resource_putref(res);
         EXIT;
 }
@@ -1074,7 +1085,6 @@ int ldlm_replay_locks(struct obd_import *imp)
         /* ensure this doesn't fall to 0 before all have been queued */
         atomic_inc(&imp->imp_replay_inflight);
 
-        l_lock(&ns->ns_lock);
         (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
 
         list_for_each_safe(pos, next, &list) {
@@ -1083,7 +1093,6 @@ int ldlm_replay_locks(struct obd_import *imp)
                 if (rc)
                         break; /* or try to do the rest? */
         }
-        l_unlock(&ns->ns_lock);
 
         atomic_dec(&imp->imp_replay_inflight);
 
index 9cd393a..18e93f3 100644 (file)
@@ -236,11 +236,10 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
         strcpy(ns->ns_name, name);
 
         INIT_LIST_HEAD(&ns->ns_root_list);
-        l_lock_init(&ns->ns_lock);
         ns->ns_refcount = 0;
         ns->ns_client = client;
-        spin_lock_init(&ns->ns_counter_lock);
-        ns->ns_locks = 0;
+        spin_lock_init(&ns->ns_hash_lock);
+        atomic_set(&ns->ns_locks, 0);
         ns->ns_resources = 0;
         init_waitqueue_head(&ns->ns_waitq);
 
@@ -251,6 +250,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
         INIT_LIST_HEAD(&ns->ns_unused_list);
         ns->ns_nr_unused = 0;
         ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
+        spin_lock_init(&ns->ns_unused_lock);
 
         down(&ldlm_namespace_lock);
         list_add(&ns->ns_list_chain, &ldlm_namespace_list);
@@ -280,15 +280,33 @@ extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
 static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
                              int flags)
 {
-        struct list_head *tmp, *pos;
+        struct list_head *tmp;
         int rc = 0, client = res->lr_namespace->ns_client;
         int local_only = (flags & LDLM_FL_LOCAL_ONLY);
         ENTRY;
 
-        list_for_each_safe(tmp, pos, q) {
-                struct ldlm_lock *lock;
-                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-                LDLM_LOCK_GET(lock);
+        
+        do {
+                struct ldlm_lock *lock = NULL;
+                /* first, we look for non-cleaned-yet lock
+                 * all cleaned locks are marked by CLEANED flag */
+                lock_res(res);
+                list_for_each(tmp, q) {
+                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                        if (lock->l_flags & LDLM_FL_CLEANED) {
+                                lock = NULL;
+                                continue;
+                        }
+                        LDLM_LOCK_GET(lock);
+                        lock->l_flags |= LDLM_FL_CLEANED;
+                        break;
+                }
+                
+                if (lock == NULL) {
+                        unlock_res(res);
+                        break;
+                }
 
                 /* Set CBPENDING so nothing in the cancellation path
                  * can match this lock */
@@ -303,6 +321,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
                          * will go away ... */
                         /* ... without sending a CANCEL message. */
                         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+                        unlock_res(res);
                         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
                         if (lock->l_completion_ast)
                                 lock->l_completion_ast(lock, 0, NULL);
@@ -312,6 +331,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
 
                 if (client) {
                         struct lustre_handle lockh;
+
+                        unlock_res(res);
                         ldlm_lock2handle(lock, &lockh);
                         if (!local_only) {
                                 rc = ldlm_cli_cancel(&lockh);
@@ -322,19 +343,21 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
                         if (local_only || rc != ELDLM_OK)
                                 ldlm_lock_cancel(lock);
                 } else {
+                        ldlm_resource_unlink_lock(lock);
+                        unlock_res(res);
                         LDLM_DEBUG(lock, "Freeing a lock still held by a "
                                    "client node");
-
-                        ldlm_resource_unlink_lock(lock);
                         ldlm_lock_destroy(lock);
                 }
                 LDLM_LOCK_PUT(lock);
-        }
+        } while (1);
+
         EXIT;
 }
 
 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags)
 {
+        struct list_head *tmp;
         int i;
 
         if (ns == NULL) {
@@ -342,34 +365,39 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags)
                 return ELDLM_OK;
         }
 
-        l_lock(&ns->ns_lock);
+        /* FIXME: protect by ns_hash_lock -bzzz */
         for (i = 0; i < RES_HASH_SIZE; i++) {
-                struct list_head *tmp, *pos;
-                list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+                spin_lock(&ns->ns_hash_lock);
+                tmp = ns->ns_hash[i].next;
+                while (tmp != &(ns->ns_hash[i])) {
                         struct ldlm_resource *res;
                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
+                        spin_unlock(&ns->ns_hash_lock);
                         ldlm_resource_getref(res);
 
                         cleanup_resource(res, &res->lr_granted, flags);
                         cleanup_resource(res, &res->lr_converting, flags);
                         cleanup_resource(res, &res->lr_waiting, flags);
 
+                        spin_lock(&ns->ns_hash_lock);
+                        tmp  = tmp->next;
+
                         /* XXX what a mess: don't force cleanup if we're
                          * local_only (which is only used by recovery).  In that
                          * case, we probably still have outstanding lock refs
                          * which reference these resources. -phil */
-                        if (!ldlm_resource_putref(res) &&
+                        if (!ldlm_resource_putref_locked(res) &&
                             !(flags & LDLM_FL_LOCAL_ONLY)) {
                                 CERROR("Resource refcount nonzero (%d) after "
                                        "lock cleanup; forcing cleanup.\n",
                                        atomic_read(&res->lr_refcount));
                                 ldlm_resource_dump(D_ERROR, res);
                                 atomic_set(&res->lr_refcount, 1);
-                                ldlm_resource_putref(res);
+                                ldlm_resource_putref_locked(res);
                         }
                 }
+                spin_unlock(&ns->ns_hash_lock);
         }
-        l_unlock(&ns->ns_lock);
 
         return ELDLM_OK;
 }
@@ -438,20 +466,43 @@ static struct ldlm_resource *ldlm_resource_new(void)
         INIT_LIST_HEAD(&res->lr_granted);
         INIT_LIST_HEAD(&res->lr_converting);
         INIT_LIST_HEAD(&res->lr_waiting);
-        sema_init(&res->lr_lvb_sem, 1);
         atomic_set(&res->lr_refcount, 1);
+        spin_lock_init(&res->lr_lock);
+
+        /* one who creates the resource must unlock
+         * the semaphore after lvb initialization */
+        init_MUTEX_LOCKED(&res->lr_lvb_sem);
 
         return res;
 }
 
+/* must be called with hash lock held */
+static struct ldlm_resource *
+ldlm_resource_find(struct ldlm_namespace *ns, struct ldlm_res_id name, __u32 hash)
+{
+        struct list_head *bucket, *tmp;
+        struct ldlm_resource *res;
+
+        LASSERT_SPIN_LOCKED(&ns->ns_hash_lock);
+        bucket = ns->ns_hash + hash;
+
+        list_for_each(tmp, bucket) {
+                res = list_entry(tmp, struct ldlm_resource, lr_hash);
+                if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0)
+                        return res;
+        }
+
+        return NULL;
+}
+
 /* Args: locked namespace
  * Returns: newly-allocated, referenced, unlocked resource */
 static struct ldlm_resource *
 ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent,
-                  struct ldlm_res_id name, __u32 type)
+                  struct ldlm_res_id name, __u32 hash, __u32 type)
 {
         struct list_head *bucket;
-        struct ldlm_resource *res;
+        struct ldlm_resource *res, *old_res;
         ENTRY;
 
         LASSERTF(type >= LDLM_MIN_TYPE && type <= LDLM_MAX_TYPE,
@@ -461,20 +512,31 @@ ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent,
         if (!res)
                 RETURN(NULL);
 
-        spin_lock(&ns->ns_counter_lock);
-        ns->ns_resources++;
-        spin_unlock(&ns->ns_counter_lock);
-
-        l_lock(&ns->ns_lock);
         memcpy(&res->lr_name, &name, sizeof(res->lr_name));
         res->lr_namespace = ns;
-        ns->ns_refcount++;
-
         res->lr_type = type;
         res->lr_most_restr = LCK_NL;
 
-        bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
+        spin_lock(&ns->ns_hash_lock);
+        old_res = ldlm_resource_find(ns, name, hash);
+        if (old_res) {
+                /* someone won the race and added the resource before */
+                ldlm_resource_getref(old_res);
+                spin_unlock(&ns->ns_hash_lock);
+                OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+                /* synchronize WRT resource creation */
+                if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
+                        down(&old_res->lr_lvb_sem);
+                        up(&old_res->lr_lvb_sem);
+                }
+                RETURN(old_res);
+        }
+
+        /* we won! let's add the resource */
+        bucket = ns->ns_hash + hash;
         list_add(&res->lr_hash, bucket);
+        ns->ns_resources++;
+        ns->ns_refcount++;
 
         if (parent == NULL) {
                 list_add(&res->lr_childof, &ns->ns_root_list);
@@ -482,8 +544,19 @@ ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent,
                 res->lr_parent = parent;
                 list_add(&res->lr_childof, &parent->lr_children);
         }
-        l_unlock(&ns->ns_lock);
+        spin_unlock(&ns->ns_hash_lock);
 
+        if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
+                int rc;
+
+                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
+                rc = ns->ns_lvbo->lvbo_init(res);
+                if (rc)
+                        CERROR("lvbo_init failed for resource "
+                              LPU64": rc %d\n", name.name[0], rc);
+                /* we create resource with locked lr_lvb_sem */
+                up(&res->lr_lvb_sem);
+        }
 
         RETURN(res);
 }
@@ -495,55 +568,32 @@ struct ldlm_resource *
 ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
                   struct ldlm_res_id name, __u32 type, int create)
 {
+        __u32 hash = ldlm_hash_fn(parent, name);
         struct ldlm_resource *res = NULL;
-        struct list_head *bucket, *tmp;
         ENTRY;
 
         LASSERT(ns != NULL);
         LASSERT(ns->ns_hash != NULL);
         LASSERT(name.name[0] != 0);
 
-        l_lock(&ns->ns_lock);
-        bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
-
-        list_for_each(tmp, bucket) {
-                res = list_entry(tmp, struct ldlm_resource, lr_hash);
-
-                if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0) {
-                        ldlm_resource_getref(res);
-                        l_unlock(&ns->ns_lock);
-                        RETURN(res);
+        spin_lock(&ns->ns_hash_lock);
+        res = ldlm_resource_find(ns, name, hash);
+        if (res) {
+                ldlm_resource_getref(res);
+                spin_unlock(&ns->ns_hash_lock);
+                /* synchronize WRT resource creation */
+                if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
+                        down(&res->lr_lvb_sem);
+                        up(&res->lr_lvb_sem);
                 }
+                RETURN(res);
         }
+        spin_unlock(&ns->ns_hash_lock);
 
-        if (create) {
-                res = ldlm_resource_add(ns, parent, name, type);
-                if (res == NULL)
-                        GOTO(out, NULL);
-        } else {
-                res = NULL;
-        }
-
-        if (create && ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
-                int rc;
-
-                /* Although this is technically a lock inversion risk (lvb_sem
-                 * should be taken before DLM lock), this resource was just
-                 * created, so nobody else can take the lvb_sem yet. -p */
-                down(&res->lr_lvb_sem);
-                /* Drop the dlm lock, because lvbo_init can touch the disk */
-                l_unlock(&ns->ns_lock);
-                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
-                rc = ns->ns_lvbo->lvbo_init(res);
-                up(&res->lr_lvb_sem);
-                if (rc)
-                        CERROR("lvbo_init failed for resource "
-                              LPU64": rc %d\n", name.name[0], rc);
-        } else {
-out:
-                l_unlock(&ns->ns_lock);
-        }
+        if (create == 0)
+                RETURN(NULL);
 
+        res = ldlm_resource_add(ns, parent, name, hash, type);
         RETURN(res);
 }
 
@@ -557,6 +607,60 @@ struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
         return res;
 }
 
+int __ldlm_resource_putref_final(struct ldlm_resource *res, int locked)
+{
+        struct ldlm_namespace *ns = res->lr_namespace;
+        ENTRY;
+
+        if (!locked)
+                spin_lock(&ns->ns_hash_lock);
+
+        if (atomic_read(&res->lr_refcount) != 0) {
+                /* We lost the race. */
+                if (!locked)
+                        spin_unlock(&ns->ns_hash_lock);
+                RETURN(0);
+        }
+
+        if (!list_empty(&res->lr_granted)) {
+                ldlm_resource_dump(D_ERROR, res);
+                LBUG();
+        }
+
+        if (!list_empty(&res->lr_converting)) {
+                ldlm_resource_dump(D_ERROR, res);
+                LBUG();
+        }
+
+        if (!list_empty(&res->lr_waiting)) {
+                ldlm_resource_dump(D_ERROR, res);
+                LBUG();
+        }
+
+        if (!list_empty(&res->lr_children)) {
+                ldlm_resource_dump(D_ERROR, res);
+                LBUG();
+        }
+
+        ns->ns_refcount--;
+        list_del_init(&res->lr_hash);
+        list_del_init(&res->lr_childof);
+
+        ns->ns_resources--;
+        if (ns->ns_resources == 0)
+                wake_up(&ns->ns_waitq);
+
+        if (!locked)
+                spin_unlock(&ns->ns_hash_lock);
+
+        /* we just unhashed the resource, nobody should find it */
+        LASSERT(atomic_read(&res->lr_refcount) == 0);
+        if (res->lr_lvb_data)
+                OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
+        OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+        RETURN(1);
+}
+
 /* Returns 1 if the resource was freed, 0 if it remains. */
 int ldlm_resource_putref(struct ldlm_resource *res)
 {
@@ -568,56 +672,27 @@ int ldlm_resource_putref(struct ldlm_resource *res)
         LASSERT(atomic_read(&res->lr_refcount) > 0);
         LASSERT(atomic_read(&res->lr_refcount) < LI_POISON);
 
-        if (atomic_dec_and_test(&res->lr_refcount)) {
-                struct ldlm_namespace *ns = res->lr_namespace;
-                ENTRY;
-
-                l_lock(&ns->ns_lock);
-
-                if (atomic_read(&res->lr_refcount) != 0) {
-                        /* We lost the race. */
-                        l_unlock(&ns->ns_lock);
-                        RETURN(rc);
-                }
-
-                if (!list_empty(&res->lr_granted)) {
-                        ldlm_resource_dump(D_ERROR, res);
-                        LBUG();
-                }
+        LASSERT(atomic_read(&res->lr_refcount) >= 0);
+        if (atomic_dec_and_test(&res->lr_refcount))
+                rc = __ldlm_resource_putref_final(res, 0);
 
-                if (!list_empty(&res->lr_converting)) {
-                        ldlm_resource_dump(D_ERROR, res);
-                        LBUG();
-                }
-
-                if (!list_empty(&res->lr_waiting)) {
-                        ldlm_resource_dump(D_ERROR, res);
-                        LBUG();
-                }
-
-                if (!list_empty(&res->lr_children)) {
-                        ldlm_resource_dump(D_ERROR, res);
-                        LBUG();
-                }
-
-                ns->ns_refcount--;
-                list_del_init(&res->lr_hash);
-                list_del_init(&res->lr_childof);
-                if (res->lr_lvb_data)
-                        OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
-                l_unlock(&ns->ns_lock);
+        RETURN(rc);
+}
 
-                OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+/* Returns 1 if the resource was freed, 0 if it remains. */
+int ldlm_resource_putref_locked(struct ldlm_resource *res)
+{
+        int rc = 0;
+        ENTRY;
 
-                spin_lock(&ns->ns_counter_lock);
-                ns->ns_resources--;
-                if (ns->ns_resources == 0)
-                        wake_up(&ns->ns_waitq);
-                spin_unlock(&ns->ns_counter_lock);
+        CDEBUG(D_INFO, "putref res: %p count: %d\n", res,
+               atomic_read(&res->lr_refcount) - 1);
+        LASSERT(atomic_read(&res->lr_refcount) > 0);
+        LASSERT(atomic_read(&res->lr_refcount) < LI_POISON);
 
-                rc = 1;
-                EXIT;
-        }
+        LASSERT(atomic_read(&res->lr_refcount) >= 0);
+        if (atomic_dec_and_test(&res->lr_refcount))
+                rc = __ldlm_resource_putref_final(res, 1);
 
         RETURN(rc);
 }
@@ -625,7 +700,7 @@ int ldlm_resource_putref(struct ldlm_resource *res)
 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
                             struct ldlm_lock *lock)
 {
-        l_lock(&res->lr_namespace->ns_lock);
+        check_res_locked(res);
 
         ldlm_resource_dump(D_OTHER, res);
         CDEBUG(D_OTHER, "About to add this lock:\n");
@@ -633,14 +708,12 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
 
         if (lock->l_destroyed) {
                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
-                goto out;
+                return;
         }
 
         LASSERT(list_empty(&lock->l_res_link));
 
         list_add_tail(&lock->l_res_link, head);
- out:
-        l_unlock(&res->lr_namespace->ns_lock);
 }
 
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
@@ -648,7 +721,7 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
 {
         struct ldlm_resource *res = original->l_resource;
 
-        l_lock(&res->lr_namespace->ns_lock);
+        check_res_locked(res);
 
         ldlm_resource_dump(D_OTHER, res);
         CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original);
@@ -656,21 +729,17 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
 
         if (new->l_destroyed) {
                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
-                goto out;
+                return;
         }
 
         LASSERT(list_empty(&new->l_res_link));
-
         list_add(&new->l_res_link, &original->l_res_link);
- out:
-        l_unlock(&res->lr_namespace->ns_lock);
 }
 
 void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
 {
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        check_res_locked(lock->l_resource);
         list_del_init(&lock->l_res_link);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 }
 
 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
@@ -701,19 +770,28 @@ void ldlm_namespace_dump(int level, struct ldlm_namespace *ns)
         CDEBUG(level, "--- Namespace: %s (rc: %d, client: %d)\n",
                ns->ns_name, ns->ns_refcount, ns->ns_client);
 
-        l_lock(&ns->ns_lock);
-        if (time_after(jiffies, ns->ns_next_dump)) {
-                list_for_each(tmp, &ns->ns_root_list) {
-                        struct ldlm_resource *res;
-                        res = list_entry(tmp, struct ldlm_resource, lr_childof);
-
-                        /* Once we have resources with children, this should
-                         * really dump them recursively. */
-                        ldlm_resource_dump(level, res);
-                }
-                ns->ns_next_dump = jiffies + 10 * HZ;
+        if (time_before(jiffies, ns->ns_next_dump))
+                return;
+
+        spin_lock(&ns->ns_hash_lock);
+        tmp = ns->ns_root_list.next;
+        while (tmp != &ns->ns_root_list) {
+                struct ldlm_resource *res;
+                res = list_entry(tmp, struct ldlm_resource, lr_childof);
+
+                ldlm_resource_getref(res);
+                spin_unlock(&ns->ns_hash_lock);
+
+                lock_res(res);
+                ldlm_resource_dump(level, res);
+                unlock_res(res);
+                
+                spin_lock(&ns->ns_hash_lock);
+                tmp = tmp->next;
+                ldlm_resource_putref_locked(res);
         }
-        l_unlock(&ns->ns_lock);
+        ns->ns_next_dump = jiffies + 10 * HZ;
+        spin_unlock(&ns->ns_hash_lock);
 }
 
 void ldlm_resource_dump(int level, struct ldlm_resource *res)
index 8ac42b3..6fc869f 100644 (file)
@@ -805,8 +805,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                         goto iput;
                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
 
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 down(&lli->lli_size_sem);
+                lock_res(lock->l_resource);
                 kms = ldlm_extent_shift_kms(lock,
                                             lsm->lsm_oinfo[stripe].loi_kms);
                
@@ -814,8 +814,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
                 lsm->lsm_oinfo[stripe].loi_kms = kms;
+                unlock_res(lock->l_resource);
                 up(&lli->lli_size_sem);
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 //ll_try_done_writing(inode);
         iput:
                 iput(inode);
@@ -861,16 +861,16 @@ int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 lvb = lock->l_lvb_data;
                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
 
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 down(&inode->i_sem);
+                lock_res(lock->l_resource);
                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
                 kms = ldlm_extent_shift_kms(NULL, kms);
                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
                 lsm->lsm_oinfo[stripe].loi_kms = kms;
+                unlock_res(lock->l_resource);
                 up(&inode->i_sem);
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
         }
 
 iput:
index 8aeb2db..8e52be4 100644 (file)
@@ -977,7 +977,9 @@ int ll_process_config_update(struct ll_sb_info *sbi, int clean)
 struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
 {
         struct inode *inode = NULL;
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
+        /* NOTE: we depend on atomic igrab() -bzzz */
+        lock_res(lock->l_resource);
         if (lock->l_ast_data) {
                 struct ll_inode_info *lli = ll_i2info(lock->l_ast_data);
                 if (lli->lli_inode_magic == LLI_INODE_MAGIC) {
@@ -990,7 +992,7 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
                         inode = NULL;
                 }
         }
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
         return inode;
 }
 
index 009df55..96b87f0 100644 (file)
@@ -138,7 +138,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *l, void *data)
         lock = ldlm_handle2lock(lockh);
 
         LASSERT(lock != NULL);
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
 #ifdef __KERNEL__
         if (lock->l_ast_data && lock->l_ast_data != data) {
                 struct inode *new_inode = data;
@@ -152,7 +152,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *l, void *data)
         }
 #endif
         lock->l_ast_data = data;
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
         LDLM_LOCK_PUT(lock);
 
         EXIT;
index 54bdddb..c2ead6b 100644 (file)
@@ -243,7 +243,7 @@ struct dentry *mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id
         struct dentry *de = mds_id2dentry(obd, id, mnt), *retval = de;
         ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
         struct ldlm_res_id res_id = { .name = {0} };
-        int flags = 0, rc;
+        int flags = LDLM_FL_ATOMIC_CB, rc;
         ENTRY;
 
         if (IS_ERR(de))
@@ -271,7 +271,7 @@ struct dentry *mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id
                                 RETURN(ERR_PTR(-ENOLCK));
                         }
                 }
-                flags = 0;
+                flags = LDLM_FL_ATOMIC_CB;
 
                 res_id.name[2] = full_name_hash((unsigned char *)name, namelen);
 
@@ -804,7 +804,7 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         }
 
         /* XXX layering violation!  -phil */
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
         
         /*
          * get this: if mds_blocking_ast is racing with mds_intent_policy, such
@@ -813,13 +813,13 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
          * blocking function anymore.  So check, and return early, if so.
          */
         if (lock->l_blocking_ast != mds_blocking_ast) {
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                unlock_res(lock->l_resource);
                 RETURN(0);
         }
 
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
 
         if (do_ast) {
                 struct lustre_handle lockh;
@@ -2229,8 +2229,9 @@ static int mdt_obj_create(struct ptlrpc_request *req)
                 if (!IS_ERR(new) && new->d_inode) {
                         struct lustre_id sid;
                                 
-                        CWARN("mkdir() repairing is on its way: %lu/%lu\n",
-                              (unsigned long)id_ino(&id), (unsigned long)id_gen(&id));
+                        CDEBUG(D_OTHER, "mkdir repairing %lu/%lu\n",
+                               (unsigned long)id_ino(&id),
+                               (unsigned long)id_gen(&id));
                         
                         obdo_from_inode(&repbody->oa, new->d_inode,
                                         FILTER_VALID_FLAGS);
@@ -2571,6 +2572,7 @@ static void mds_revoke_export_locks(struct obd_export *exp)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks;
+        struct list_head work;
         struct ldlm_lock *lock, *next;
         struct ldlm_lock_desc desc;
 
@@ -2578,20 +2580,31 @@ static void mds_revoke_export_locks(struct obd_export *exp)
                 return;
 
         ENTRY;
-        l_lock(&ns->ns_lock);
+        CERROR("implement right locking here! -bzzz\n");
+        INIT_LIST_HEAD(&work);
+        spin_lock(&exp->exp_ldlm_data.led_lock);
         list_for_each_entry_safe(lock, next, locklist, l_export_chain) {
-                if (lock->l_req_mode != lock->l_granted_mode)
+
+                lock_res(lock->l_resource);
+                if (lock->l_req_mode != lock->l_granted_mode) {
+                        unlock_res(lock->l_resource);
                         continue;
+                }
 
                 LASSERT(lock->l_resource);
                 if (lock->l_resource->lr_type != LDLM_IBITS &&
-                    lock->l_resource->lr_type != LDLM_PLAIN)
+                    lock->l_resource->lr_type != LDLM_PLAIN) {
+                        unlock_res(lock->l_resource);
                         continue;
+                }
 
-                if (lock->l_flags & LDLM_FL_AST_SENT)
+                if (lock->l_flags & LDLM_FL_AST_SENT) {
+                        unlock_res(lock->l_resource);
                         continue;
+                }
 
                 lock->l_flags |= LDLM_FL_AST_SENT;
+                unlock_res(lock->l_resource);
 
                 /* the desc just pretend to exclusive */
                 ldlm_lock2desc(lock, &desc);
@@ -2600,7 +2613,8 @@ static void mds_revoke_export_locks(struct obd_export *exp)
 
                 lock->l_blocking_ast(lock, &desc, NULL, LDLM_CB_BLOCKING);
         }
-        l_unlock(&ns->ns_lock);
+        spin_unlock(&exp->exp_ldlm_data.led_lock);
+
         EXIT;
 }
 
@@ -3766,7 +3780,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
                 return;
 
-        l_lock(&obd->obd_namespace->ns_lock);
+        spin_lock(&obd->obd_namespace->ns_hash_lock);
         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
                 struct ldlm_lock *lock;
                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
@@ -3779,11 +3793,11 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
                                   lockh->cookie);
                         if (old_lock)
                                 *old_lock = LDLM_LOCK_GET(lock);
-                        l_unlock(&obd->obd_namespace->ns_lock);
+                        spin_unlock(&obd->obd_namespace->ns_hash_lock);
                         return;
                 }
         }
-        l_unlock(&obd->obd_namespace->ns_lock);
+        spin_unlock(&obd->obd_namespace->ns_hash_lock);
 
         /* If the xid matches, then we know this is a resent request,
          * and allow it. (It's probably an OPEN, for which we don't
@@ -3981,13 +3995,16 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         }
 
         /* Fixup the lock to be given to the client */
-        l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+        lock_res(new_lock->l_resource);
         new_lock->l_readers = 0;
         new_lock->l_writers = 0;
 
         new_lock->l_export = class_export_get(req->rq_export);
+
+        spin_lock(&new_lock->l_export->exp_ldlm_data.led_lock);
         list_add(&new_lock->l_export_chain,
                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
+        spin_unlock(&new_lock->l_export->exp_ldlm_data.led_lock);
 
         new_lock->l_blocking_ast = lock->l_blocking_ast;
         new_lock->l_completion_ast = lock->l_completion_ast;
@@ -3997,8 +4014,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
 
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
+        unlock_res(new_lock->l_resource);
         LDLM_LOCK_PUT(new_lock);
-        l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
 
         RETURN(ELDLM_LOCK_REPLACED);
 }
index 8072ef3..7910673 100644 (file)
@@ -874,7 +874,7 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
 {
         struct ldlm_res_id child_res_id = { .name = { inode->i_ino, 0, 1, 0 } };
         struct lustre_handle lockh;
-        int lock_flags = 0;
+        int lock_flags = LDLM_FL_ATOMIC_CB;
         int rc;
         ENTRY;
 
@@ -1311,7 +1311,7 @@ got_child:
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
                 struct ldlm_res_id child_res_id = { .name = {0}};
                 ldlm_policy_data_t policy;
-                int lock_flags = 0;
+                int lock_flags = LDLM_FL_ATOMIC_CB;
                 
                 /* LOOKUP lock will protect dentry on client -bzzz */
                 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP |
index e571b9b..fa802cf 100644 (file)
@@ -1263,7 +1263,7 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
                res_id[0]->name[0], res_id[1]->name[0]);
 
-        flags = LDLM_FL_LOCAL_ONLY;
+        flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
                               LDLM_IBITS, policies[0], lock_modes[0], &flags,
                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
@@ -1277,7 +1277,7 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
                 ldlm_lock_addref(handles[1], lock_modes[1]);
         } else if (res_id[1]->name[0] != 0) {
-                flags = LDLM_FL_LOCAL_ONLY;
+                flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
                                       *res_id[1], LDLM_IBITS, policies[1],
                                       lock_modes[1], &flags, mds_blocking_ast,
@@ -1469,7 +1469,7 @@ changed:
         *dchildp = dchild = vchild;
 
         if (dchild->d_inode || (dchild->d_flags & DCACHE_CROSS_REF)) {
-                int flags = 0;
+                int flags = LDLM_FL_ATOMIC_CB;
                 
                 if (dchild->d_inode) {
                         down(&dchild->d_inode->i_sem);
@@ -1560,7 +1560,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
         if (name && IS_PDIROPS((*dparentp)->d_inode)) {
                 struct ldlm_res_id res_id = { .name = {0} };
                 ldlm_policy_data_t policy;
-                int flags = 0;
+                int flags = LDLM_FL_ATOMIC_CB;
 
                 *update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode);
                 if (*update_mode) {
@@ -2341,7 +2341,7 @@ static int mds_reint_link_acquire(struct mds_update_record *rec,
         int rc = 0, cleanup_phase = 0;
         struct dentry *de_src = NULL;
         ldlm_policy_data_t policy;
-        int flags = 0;
+        int flags = LDLM_FL_ATOMIC_CB;
         ENTRY;
 
         DEBUG_REQ(D_INODE, req, "%s: request to acquire i_nlinks "DLID4"\n",
@@ -2585,7 +2585,7 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         
 #ifdef S_PDIROPS
         if (IS_PDIROPS(de_tgt_dir->d_inode)) {
-                int flags = 0;
+                int flags = LDLM_FL_ATOMIC_CB;
                 update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX);
                 if (update_mode) {
                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
index 09ef3c2..007a40d 100644 (file)
@@ -490,6 +490,7 @@ struct obd_export *class_new_export(struct obd_device *obd)
         INIT_LIST_HEAD(&export->exp_outstanding_replies);
         /* XXX this should be in LDLM init */
         INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
+        spin_lock_init(&export->exp_ldlm_data.led_lock);
 
         INIT_LIST_HEAD(&export->exp_handle.h_link);
         class_handle_hash(&export->exp_handle, export_handle_addref);
index 77123c8..f9e4bf9 100644 (file)
@@ -1068,20 +1068,20 @@ static int filter_blocking_ast(struct ldlm_lock *lock,
         }
 
         /* XXX layering violation!  -phil */
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
          * such that filter_blocking_ast is called just before l_i_p takes the
          * ns_lock, then by the time we get the lock, we might not be the
          * correct blocking function anymore.  So check, and return early, if
          * so. */
         if (lock->l_blocking_ast != filter_blocking_ast) {
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                unlock_res(lock->l_resource);
                 RETURN(0);
         }
 
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
 
         if (do_ast) {
                 struct lustre_handle lockh;
@@ -1308,25 +1308,24 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
         lock->l_req_mode = LCK_PR;
 
-        l_lock(&res->lr_namespace->ns_lock);
-
-        res->lr_tmp = &rpc_list;
-        rc = policy(lock, &tmpflags, 0, &err);
-        res->lr_tmp = NULL;
+        lock_res(res);
+        rc = policy(lock, &tmpflags, 0, &err, &rpc_list);
 
         /* FIXME: we should change the policy function slightly, to not make
          * this list at all, since we just turn around and free it */
         while (!list_empty(&rpc_list)) {
-                struct ldlm_ast_work *w =
-                        list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
-                list_del(&w->w_list);
-                LDLM_LOCK_PUT(w->w_lock);
-                OBD_FREE(w, sizeof(*w));
+                struct ldlm_lock *wlock =
+                        list_entry(rpc_list.next, struct ldlm_lock, l_cp_ast);
+                LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
+                LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
+                lock->l_flags &= ~LDLM_FL_CP_REQD;
+                list_del_init(&wlock->l_cp_ast);
+                LDLM_LOCK_PUT(wlock);
         }
 
         if (rc == LDLM_ITER_CONTINUE) {
                 /* The lock met with no resistance; we're finished. */
-                l_unlock(&res->lr_namespace->ns_lock);
+                unlock_res(res);
                 RETURN(ELDLM_LOCK_REPLACED);
         }
 
@@ -1334,11 +1333,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
          * policy nicely created a list of all PW locks for us.  We will choose
          * the highest of those which are larger than the size in the LVB, if
          * any, and perform a glimpse callback. */
-        down(&res->lr_lvb_sem);
         res_lvb = res->lr_lvb_data;
         LASSERT(res_lvb != NULL);
         *reply_lvb = *res_lvb;
-        up(&res->lr_lvb_sem);
 
         list_for_each(tmp, &res->lr_granted) {
                 struct ldlm_lock *tmplock =
@@ -1362,7 +1359,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                 LDLM_LOCK_PUT(l);
                 l = LDLM_LOCK_GET(tmplock);
         }
-        l_unlock(&res->lr_namespace->ns_lock);
+        unlock_res(res);
 
         /* There were no PW locks beyond the size in the LVB; finished. */
         if (l == NULL)
@@ -1382,9 +1379,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                 res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
         }
 
-        down(&res->lr_lvb_sem);
+        lock_res(res);
         *reply_lvb = *res_lvb;
-        up(&res->lr_lvb_sem);
+        unlock_res(res);
 out:
         LDLM_LOCK_PUT(l);
 
index b539f3f..b1ac24c 100644 (file)
@@ -142,6 +142,8 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
                         //GOTO(out, rc = -EPROTO);
                         GOTO(out, rc = 0);
                 }
+
+                lock_res(res);
                 if (new->lvb_size > lvb->lvb_size || !increase) {
                         CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: "
                                LPU64" -> "LPU64"\n", res->lr_name.name[0],
@@ -166,6 +168,7 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
                                lvb->lvb_ctime, new->lvb_ctime);
                         lvb->lvb_ctime = new->lvb_ctime;
                 }
+                unlock_res(res);
         }
 
         /* Update the LVB from the disk inode */
@@ -187,6 +190,7 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
+        lock_res(res);
         if (dentry->d_inode->i_size > lvb->lvb_size || !increase) {
                 CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: "
                        LPU64" -> %llu\n", res->lr_name.name[0],
@@ -216,6 +220,7 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
                LPU64" -> %lu\n", res->lr_name.name[0],
                lvb->lvb_blocks, dentry->d_inode->i_blocks);
         lvb->lvb_blocks = dentry->d_inode->i_blocks;
+        unlock_res(res);
 
         f_dput(dentry);
 out:
index 4dc4bf9..edc83ec 100644 (file)
@@ -2377,7 +2377,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
                 return;
         }
 
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock_res(lock->l_resource);
 #ifdef __KERNEL__
         if (lock->l_ast_data && lock->l_ast_data != data) {
                 struct inode *new_inode = data;
@@ -2393,7 +2393,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
         }
 #endif
         lock->l_ast_data = data;
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        unlock_res(lock->l_resource);
         LDLM_LOCK_PUT(lock);
 }