Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 81b3b5d..a1220ab 100644 (file)
@@ -25,7 +25,6 @@
 
 #include <linux/slab.h>
 #include <linux/module.h>
-#include <linux/random.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_mds.h>
 #include <linux/obd_class.h>
@@ -102,12 +101,13 @@ ldlm_res_compat ldlm_res_compat_table[] = {
 
 static ldlm_res_policy ldlm_intent_policy_func;
 
-static int ldlm_plain_policy(struct ldlm_lock *lock, void *req_cookie,
-                             ldlm_mode_t mode, int flags, void *data)
+static int ldlm_plain_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
+                             void *req_cookie, ldlm_mode_t mode, int flags,
+                             void *data)
 {
         if ((flags & LDLM_FL_HAS_INTENT) && ldlm_intent_policy_func) {
-                return ldlm_intent_policy_func(lock, req_cookie, mode, flags, 
-                                               data);
+                return ldlm_intent_policy_func(ns, lock, req_cookie, mode,
+                                               flags, data);
         }
 
         return ELDLM_OK;
@@ -186,6 +186,8 @@ void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
         EXIT;
 }
 
+/* Only called with strict == 0 by recovery, to mark in-use locks as
+ * should-be-destroyed */
 void ldlm_lock_destroy(struct ldlm_lock *lock)
 {
         ENTRY;
@@ -194,16 +196,16 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
         if (!list_empty(&lock->l_children)) {
                 LDLM_DEBUG(lock, "still has children (%p)!",
                            lock->l_children.next);
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_ERROR, lock);
                 LBUG();
         }
         if (lock->l_readers || lock->l_writers) {
                 LDLM_DEBUG(lock, "lock still has references");
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_OTHER, lock);
         }
 
         if (!list_empty(&lock->l_res_link)) {
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_ERROR, lock);
                 LBUG();
         }
 
@@ -217,6 +219,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
 
         list_del_init(&lock->l_export_chain);
         ldlm_lock_remove_from_lru(lock);
+        portals_handle_unhash(&lock->l_handle);
 
 #if 0
         /* Wake anyone waiting for this lock */
@@ -257,7 +260,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         if (lock == NULL)
                 RETURN(NULL);
 
-        get_random_bytes(&lock->l_random, sizeof(__u64));
         lock->l_resource = ldlm_resource_getref(resource);
 
         atomic_set(&lock->l_refc, 2);
@@ -279,12 +281,15 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
         }
 
+        INIT_LIST_HEAD(&lock->l_handle.h_link);
+        portals_handle_hash(&lock->l_handle, lock_handle_addref);
+
         RETURN(lock);
 }
 
-int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
+int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
+                              __u64 new_resid[3])
 {
-        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
         struct ldlm_resource *oldres = lock->l_resource;
         ENTRY;
 
@@ -321,66 +326,63 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
 
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
 {
-        lockh->addr = (__u64) (unsigned long)lock;
-        lockh->cookie = lock->l_random;
+        //lockh->addr = (__u64)(unsigned long)lock;
+        memset(&lockh->addr, 0x69, sizeof(lockh->addr));
+        lockh->cookie = lock->l_handle.h_cookie;
 }
 
-/* 
- * if flags: atomically get the lock and set the flags. 
- * Return NULL if flag already set
+/* if flags: atomically get the lock and set the flags. 
+ *           Return NULL if flag already set
  */
 
-struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int strict,
-                                     int flags)
+struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
 {
         struct ldlm_lock *lock = NULL, *retval = NULL;
         ENTRY;
 
         LASSERT(handle);
 
-        if (!handle->addr)
+        lock = portals_handle2object(handle->cookie);
+        if (lock == NULL)
                 RETURN(NULL);
 
-        lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
-        if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) {
-                //CERROR("bogus lock %p\n", lock);
-                GOTO(out2, retval);
-        }
-
-        if (lock->l_random != handle->cookie) {
-                //CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64
-                //       "\n", lock, lock->l_random, handle->cookie);
-                GOTO(out2, NULL);
-        }
-        if (!lock->l_resource) {
-                CERROR("trying to lock bogus resource: lock %p\n", lock);
-                //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
-                GOTO(out2, retval);
-        }
-        if (!lock->l_resource->lr_namespace) {
-                CERROR("trying to lock bogus namespace: lock %p\n", lock);
-                //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
-                GOTO(out2, retval);
-        }
+        LASSERT(lock->l_resource != NULL);
+        LASSERT(lock->l_resource->lr_namespace != NULL);
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        if (strict && lock->l_destroyed) {
+
+        /* It's unlikely but possible that someone marked the lock as
+         * destroyed after we did handle2object on it */
+        if (lock->l_destroyed) {
                 CERROR("lock already destroyed: lock %p\n", lock);
-                //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
-                GOTO(out, NULL);
+                LDLM_LOCK_PUT(lock);
+                GOTO(out, retval);
         }
 
-        if (flags && (lock->l_flags & flags))
-                GOTO(out, NULL);
+        if (flags && (lock->l_flags & flags)) {
+                LDLM_LOCK_PUT(lock);
+                GOTO(out, retval);
+        }
 
         if (flags)
                 lock->l_flags |= flags;
 
-        retval = LDLM_LOCK_GET(lock);
+        retval = lock;
         EXIT;
  out:
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- out2:
+        return retval;
+}
+
+struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
+                                      struct lustre_handle *handle)
+{
+        struct ldlm_lock *retval = NULL;
+
+        l_lock(&ns->ns_lock);
+        retval = __ldlm_handle2lock(handle, 0);
+        l_unlock(&ns->ns_lock);
+
         return retval;
 }
 
@@ -456,7 +458,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
 {
-        struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0, 0);
+        struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
         struct ldlm_namespace *ns;
         ENTRY;
 
@@ -466,10 +468,13 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         ns = lock->l_resource->lr_namespace;
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
+                LASSERT(lock->l_readers > 0);
                 lock->l_readers--;
-        else
+        } else {
+                LASSERT(lock->l_writers > 0);
                 lock->l_writers--;
+        }
 
         /* If we received a blocked AST and this was the last reference,
          * run the callback. */
@@ -493,8 +498,9 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
                 ns->ns_nr_unused++;
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 ldlm_cancel_lru(ns);
-        } else
+        } else {
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        }
 
         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
         LDLM_LOCK_PUT(lock);    /* matches the handle2lock above */
@@ -711,8 +717,8 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
         return lock;
 }
 
-/* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
-ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
+ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
+                               struct ldlm_lock *lock,
                                void *cookie, int cookie_len,
                                int *flags,
                                ldlm_completion_callback completion,
@@ -734,7 +740,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
         if (!local && !(*flags & LDLM_FL_REPLAY) &&
             (policy = ldlm_res_policy_table[res->lr_type])) {
                 int rc;
-                rc = policy(lock, cookie, lock->l_req_mode, *flags, NULL);
+                rc = policy(ns, lock, cookie, lock->l_req_mode, *flags, NULL);
 
                 if (rc == ELDLM_LOCK_CHANGED) {
                         res = lock->l_resource;
@@ -745,7 +751,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
                 }
         }
 
-        l_lock(&res->lr_namespace->ns_lock);
+        l_lock(&ns->ns_lock);
         if (local && lock->l_req_mode == lock->l_granted_mode) {
                 /* The server returned a blocked lock, but it was granted before
                  * we got a chance to actually enqueue it.  We don't need to do
@@ -767,7 +773,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
          * FIXME (bug 268): Detect obvious lies by checking compatibility in
          * granted/converting queues. */
         ldlm_resource_unlink_lock(lock);
-        if (local || (*flags & LDLM_FL_REPLAY)) {
+        if (local) {
                 if (*flags & LDLM_FL_BLOCK_CONV)
                         ldlm_resource_add_lock(res, res->lr_converting.prev,
                                                lock);
@@ -776,6 +782,19 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
                 else
                         ldlm_grant_lock(lock);
                 GOTO(out, ELDLM_OK);
+        } else if (*flags & LDLM_FL_REPLAY) {
+                if (*flags & LDLM_FL_BLOCK_CONV) {
+                        ldlm_resource_add_lock(res, res->lr_converting.prev,
+                                               lock);
+                        GOTO(out, ELDLM_OK);
+                } else if (*flags & LDLM_FL_BLOCK_WAIT) {
+                        ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+                        GOTO(out, ELDLM_OK);
+                } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
+                        ldlm_grant_lock(lock);
+                        GOTO(out, ELDLM_OK);
+                }
+                /* If no flags, fall through to normal enqueue path. */
         }
 
         /* FIXME: We may want to optimize by checking lr_most_restr */
@@ -798,7 +817,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
         ldlm_grant_lock(lock);
         EXIT;
       out:
-        l_unlock(&res->lr_namespace->ns_lock);
+        l_unlock(&ns->ns_lock);
         /* Don't set 'completion_ast' until here so that if the lock is granted
          * immediately we don't do an unnecessary completion call. */
         lock->l_completion_ast = completion;
@@ -828,10 +847,10 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res,
         RETURN(0);
 }
 
-void ldlm_run_ast_work(struct list_head *rpc_list)
+int ldlm_run_ast_work(struct list_head *rpc_list)
 {
         struct list_head *tmp, *pos;
-        int rc;
+        int rc, retval = 0;
         ENTRY;
 
         list_for_each_safe(tmp, pos, rpc_list) {
@@ -844,20 +863,34 @@ void ldlm_run_ast_work(struct list_head *rpc_list)
                                  w->w_datalen, LDLM_CB_BLOCKING);
                 else
                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
-                if (rc)
+                if (rc == -ERESTART)
+                        retval = rc;
+                else if (rc)
                         CERROR("Failed AST - should clean & disconnect "
                                "client\n");
                 LDLM_LOCK_PUT(w->w_lock);
                 list_del(&w->w_list);
                 OBD_FREE(w, sizeof(*w));
         }
-        EXIT;
+        RETURN(retval);
+}
+
+static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
+{
+        ldlm_reprocess_all(res);
+        return LDLM_ITER_CONTINUE;
+}
+
+void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
+{
+        (void)ldlm_namespace_foreach_res(ns, reprocess_one_queue, NULL);
 }
 
 /* Must be called with resource->lr_lock not taken. */
 void ldlm_reprocess_all(struct ldlm_resource *res)
 {
         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+        int rc;
         ENTRY;
 
         /* Local lock trees don't get reprocessed. */
@@ -866,6 +899,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
                 return;
         }
 
+ restart:
         l_lock(&res->lr_namespace->ns_lock);
         res->lr_tmp = &rpc_list;
 
@@ -876,7 +910,9 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
         res->lr_tmp = NULL;
         l_unlock(&res->lr_namespace->ns_lock);
 
-        ldlm_run_ast_work(&rpc_list);
+        rc = ldlm_run_ast_work(&rpc_list);
+        if (rc == -ERESTART)
+                goto restart;
         EXIT;
 }
 
@@ -905,10 +941,12 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         ns = res->lr_namespace;
 
         l_lock(&ns->ns_lock);
+        /* Please do not, no matter how tempting, remove this LBUG without
+         * talking to me first. -phik */
         if (lock->l_readers || lock->l_writers) {
                 LDLM_DEBUG(lock, "lock still has references");
-                ldlm_lock_dump(lock);
-                //LBUG();
+                ldlm_lock_dump(D_OTHER, lock);
+                LBUG();
         }
 
         ldlm_cancel_callback(lock);
@@ -1001,18 +1039,18 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
         RETURN(res);
 }
 
-void ldlm_lock_dump(struct ldlm_lock *lock)
+void ldlm_lock_dump(int level, struct ldlm_lock *lock)
 {
         char ver[128];
 
-        if (!(portal_debug & D_OTHER))
+        if (!(portal_debug & level))
                 return;
 
         if (RES_VERSION_SIZE != 4)
                 LBUG();
 
         if (!lock) {
-                CDEBUG(D_OTHER, "  NULL LDLM lock\n");
+                CDEBUG(level, "  NULL LDLM lock\n");
                 return;
         }
 
@@ -1020,27 +1058,26 @@ void ldlm_lock_dump(struct ldlm_lock *lock)
                  lock->l_version[0], lock->l_version[1],
                  lock->l_version[2], lock->l_version[3]);
 
-        CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
+        CDEBUG(level, "  -- Lock dump: %p (%s)\n", lock, ver);
         if (lock->l_export && lock->l_export->exp_connection)
-                CDEBUG(D_OTHER, "  Node: NID %x (rhandle: "LPX64")\n",
+                CDEBUG(level, "  Node: NID %x (rhandle: "LPX64")\n",
                        lock->l_export->exp_connection->c_peer.peer_nid,
-                       lock->l_remote_handle.addr);
+                       lock->l_remote_handle.cookie);
         else
-                CDEBUG(D_OTHER, "  Node: local\n");
-        CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
-        CDEBUG(D_OTHER, "  Resource: %p ("LPD64")\n", lock->l_resource,
+                CDEBUG(level, "  Node: local\n");
+        CDEBUG(level, "  Parent: %p\n", lock->l_parent);
+        CDEBUG(level, "  Resource: %p ("LPD64")\n", lock->l_resource,
                lock->l_resource->lr_name[0]);
-        CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
+        CDEBUG(level, "  Requested mode: %d, granted mode: %d\n",
                (int)lock->l_req_mode, (int)lock->l_granted_mode);
-        CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
+        CDEBUG(level, "  Readers: %u ; Writers; %u\n",
                lock->l_readers, lock->l_writers);
         if (lock->l_resource->lr_type == LDLM_EXTENT)
-                CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
-                       (unsigned long long)lock->l_extent.start,
-                       (unsigned long long)lock->l_extent.end);
+                CDEBUG(level, "  Extent: "LPU64" -> "LPU64"\n",
+                       lock->l_extent.start, lock->l_extent.end);
 }
 
-void ldlm_lock_dump_handle(struct lustre_handle *lockh)
+void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
 {
         struct ldlm_lock *lock;
 
@@ -1048,7 +1085,7 @@ void ldlm_lock_dump_handle(struct lustre_handle *lockh)
         if (lock == NULL)
                 return;
 
-        ldlm_lock_dump(lock);
+        ldlm_lock_dump(D_OTHER, lock);
 
         LDLM_LOCK_PUT(lock);
 }