Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index e08ddb4..81cc428 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_LDLM
 
+#ifdef __KERNEL__
 #include <linux/slab.h>
 #include <linux/module.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_mds.h>
+#else
+#include <liblustre.h>
+#include <linux/kp30.h>
+#endif
+
 #include <linux/obd_class.h>
 
 //struct lustre_lock ldlm_everything_lock;
@@ -46,6 +52,7 @@ char *ldlm_typename[] = {
         [LDLM_EXTENT] "EXT",
 };
 
+#ifdef __KERNEL__
 char *ldlm_it2str(int it)
 {
         switch (it) {
@@ -59,10 +66,6 @@ char *ldlm_it2str(int it)
                 return "readdir";
         case IT_GETATTR:
                 return "getattr";
-        case IT_TRUNC:
-                return "truncate";
-        case IT_SETATTR:
-                return "setattr";
         case IT_LOOKUP:
                 return "lookup";
         case IT_UNLINK:
@@ -72,13 +75,14 @@ char *ldlm_it2str(int it)
                 return "UNKNOWN";
         }
 }
+#endif
 
 extern kmem_cache_t *ldlm_lock_slab;
 struct lustre_lock ldlm_handle_lock;
 
 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
 
-ldlm_res_compat ldlm_res_compat_table[] = {
+static ldlm_res_compat ldlm_res_compat_table[] = {
         [LDLM_PLAIN] ldlm_plain_compat,
         [LDLM_EXTENT] ldlm_extent_compat,
 };
@@ -97,7 +101,7 @@ static int ldlm_plain_policy(struct ldlm_namespace *ns, struct ldlm_lock **lock,
         return ELDLM_OK;
 }
 
-ldlm_res_policy ldlm_res_policy_table[] = {
+static ldlm_res_policy ldlm_res_policy_table[] = {
         [LDLM_PLAIN] ldlm_plain_policy,
         [LDLM_EXTENT] ldlm_extent_policy,
 };
@@ -136,7 +140,7 @@ void ldlm_lock_put(struct ldlm_lock *lock)
 
         if (atomic_dec_and_test(&lock->l_refc)) {
                 l_lock(&ns->ns_lock);
-                LDLM_DEBUG0(lock, "final lock_put on destroyed lock, freeing");
+                LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
                 LASSERT(lock->l_destroyed);
                 LASSERT(list_empty(&lock->l_res_link));
 
@@ -207,7 +211,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
 
         list_del_init(&lock->l_export_chain);
         ldlm_lock_remove_from_lru(lock);
-        portals_handle_unhash(&lock->l_handle);
+        class_handle_unhash(&lock->l_handle);
 
 #if 0
         /* Wake anyone waiting for this lock */
@@ -270,7 +274,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         }
 
         INIT_LIST_HEAD(&lock->l_handle.h_link);
-        portals_handle_hash(&lock->l_handle, lock_handle_addref);
+        class_handle_hash(&lock->l_handle, lock_handle_addref);
 
         RETURN(lock);
 }
@@ -314,7 +318,7 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
 {
-        memset(&lockh->addr, 0x69, sizeof(lockh->addr));
+        POISON(&lockh->addr, 0x69, sizeof(lockh->addr));
         lockh->cookie = lock->l_handle.h_cookie;
 }
 
@@ -329,7 +333,7 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
 
         LASSERT(handle);
 
-        lock = portals_handle2object(handle->cookie);
+        lock = class_handle2object(handle->cookie);
         if (lock == NULL)
                 RETURN(NULL);
 
@@ -388,7 +392,7 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
 }
 
 static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
-                                   struct ldlm_lock *new, 
+                                   struct ldlm_lock *new,
                                    void *data, int datalen)
 {
         struct ldlm_ast_work *w;
@@ -479,12 +483,18 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                         CERROR("FL_CBPENDING set on non-local lock--just a "
                                "warning\n");
 
-                LDLM_DEBUG0(lock, "final decref done on cbpending lock");
+                LDLM_DEBUG(lock, "final decref done on cbpending lock");
+
+                if (lock->l_blocking_ast == NULL) {
+                        /* The lock wasn't even fully formed; just destroy it */
+                        ldlm_lock_destroy(lock);
+                }
                 l_unlock(&ns->ns_lock);
 
                 /* FIXME: need a real 'desc' here */
-                lock->l_blocking_ast(lock, NULL, lock->l_data,
-                                     LDLM_CB_BLOCKING);
+                if (lock->l_blocking_ast != NULL)
+                        lock->l_blocking_ast(lock, NULL, lock->l_data,
+                                             LDLM_CB_BLOCKING);
         } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
                 /* If this is a client-side namespace and this was the last
                  * reference, put it on the LRU. */
@@ -710,7 +720,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
                         lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, NULL);
         }
         if (rc)
-                LDLM_DEBUG0(lock, "matched");
+                LDLM_DEBUG(lock, "matched");
         else
                 LDLM_DEBUG_NOLOCK("not matched");
 
@@ -866,10 +876,10 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
         ldlm_grant_lock(lock, NULL, 0);
         EXIT;
       out:
-        l_unlock(&ns->ns_lock);
         /* Don't set 'completion_ast' until here so that if the lock is granted
          * immediately we don't do an unnecessary completion call. */
         lock->l_completion_ast = completion;
+        l_unlock(&ns->ns_lock);
         return ELDLM_OK;
 }
 
@@ -906,13 +916,27 @@ int ldlm_run_ast_work(struct list_head *rpc_list)
                 struct ldlm_ast_work *w =
                         list_entry(tmp, struct ldlm_ast_work, w_list);
 
-                if (w->w_blocking)
+                /* It's possible to receive a completion AST before we've set
+                 * the l_completion_ast pointer: either because the AST arrived
+                 * before the reply, or simply because there's a small race
+                 * window between receiving the reply and finishing the local
+                 * enqueue. (bug 842)
+                 *
+                 * This can't happen with the blocking_ast, however, because we
+                 * will never call the local blocking_ast until we drop our
+                 * reader/writer reference, which we won't do until we get the
+                 * reply and finish enqueueing. */
+                if (w->w_blocking) {
+                        LASSERT(w->w_lock->l_blocking_ast != NULL);
                         rc = w->w_lock->l_blocking_ast
                                 (w->w_lock, &w->w_desc, w->w_data,
                                  LDLM_CB_BLOCKING);
-                else
+                } else if (w->w_lock->l_completion_ast != NULL) {
                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags,
                                                          w->w_data);
+                } else {
+                        rc = 0;
+                }
                 if (rc == -ERESTART)
                         retval = rc;
                 else if (rc)
@@ -974,7 +998,7 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
                         lock->l_blocking_ast(lock, NULL, lock->l_data,
                                              LDLM_CB_CANCELING);
                 else
-                        LDLM_DEBUG0(lock, "no blocking ast");
+                        LDLM_DEBUG(lock, "no blocking ast");
         }
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 }
@@ -994,7 +1018,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         /* Please do not, no matter how tempting, remove this LBUG without
          * talking to me first. -phik */
         if (lock->l_readers || lock->l_writers) {
-                LDLM_DEBUG0(lock, "lock still has references");
+                LDLM_DEBUG(lock, "lock still has references");
                 ldlm_lock_dump(D_OTHER, lock);
                 LBUG();
         }
@@ -1113,8 +1137,9 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock)
         CDEBUG(level, "  -- Lock dump: %p (%s) (rc: %d)\n", lock, ver,
                atomic_read(&lock->l_refc));
         if (lock->l_export && lock->l_export->exp_connection)
-                CDEBUG(level, "  Node: NID %x (rhandle: "LPX64")\n",
+                CDEBUG(level, "  Node: NID "LPX64" on %s (rhandle: "LPX64")\n",
                        lock->l_export->exp_connection->c_peer.peer_nid,
+                       lock->l_export->exp_connection->c_peer.peer_ni->pni_name,
                        lock->l_remote_handle.cookie);
         else
                 CDEBUG(level, "  Node: local\n");