Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 81cc428..62272fa 100644 (file)
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
-#include <linux/slab.h>
-#include <linux/module.h>
-#include <linux/lustre_dlm.h>
-#include <linux/lustre_mds.h>
+# include <linux/slab.h>
+# include <linux/module.h>
+# include <linux/lustre_dlm.h>
+# include <linux/lustre_mds.h>
 #else
-#include <liblustre.h>
-#include <linux/kp30.h>
+# include <liblustre.h>
+# include <linux/kp30.h>
 #endif
 
 #include <linux/obd_class.h>
+#include "ldlm_internal.h"
 
 //struct lustre_lock ldlm_everything_lock;
 
@@ -154,7 +155,7 @@ void ldlm_lock_put(struct ldlm_lock *lock)
                 if (lock->l_parent)
                         LDLM_LOCK_PUT(lock->l_parent);
 
-                PORTAL_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
+                OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
                 l_unlock(&ns->ns_lock);
         }
 
@@ -248,7 +249,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         if (resource == NULL)
                 LBUG();
 
-        PORTAL_SLAB_ALLOC(lock, ldlm_lock_slab, sizeof(*lock));
+        OBD_SLAB_ALLOC(lock, ldlm_lock_slab, SLAB_KERNEL, sizeof(*lock));
         if (lock == NULL)
                 RETURN(NULL);
 
@@ -318,7 +319,6 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
 {
-        POISON(&lockh->addr, 0x69, sizeof(lockh->addr));
         lockh->cookie = lock->l_handle.h_cookie;
 }
 
@@ -447,10 +447,6 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
 }
 
-/* Args: unlocked lock */
-int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
-                                    struct ldlm_res_id, int flags);
-
 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
 {
         struct ldlm_namespace *ns;
@@ -484,17 +480,14 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                                "warning\n");
 
                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
-
-                if (lock->l_blocking_ast == NULL) {
-                        /* The lock wasn't even fully formed; just destroy it */
-                        ldlm_lock_destroy(lock);
-                }
                 l_unlock(&ns->ns_lock);
 
                 /* FIXME: need a real 'desc' here */
                 if (lock->l_blocking_ast != NULL)
                         lock->l_blocking_ast(lock, NULL, lock->l_data,
                                              LDLM_CB_BLOCKING);
+                else
+                        LDLM_DEBUG(lock, "No blocking AST?");
         } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
                 /* If this is a client-side namespace and this was the last
                  * reference, put it on the LRU. */
@@ -533,8 +526,8 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         lock->l_flags |= LDLM_FL_CBPENDING;
-        ldlm_lock_decref_internal(lock, mode);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        ldlm_lock_decref_internal(lock, mode);
         LDLM_LOCK_PUT(lock);
 }
 
@@ -630,7 +623,17 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                 if (lock == old_lock)
                         break;
 
-                if (lock->l_flags & LDLM_FL_CBPENDING)
+                /* llite sometimes wants to match locks that will be
+                 * canceled when their users drop, but we allow it to match
+                 * if it passes in CBPENDING and the lock still has users.
+                 * this is generally only going to be used by children 
+                 * whose parents already hold a lock so forward progress
+                 * can still happen. */
+                if (lock->l_flags & LDLM_FL_CBPENDING &&
+                    !(flags & LDLM_FL_CBPENDING))
+                        continue;
+                if (lock->l_flags & LDLM_FL_CBPENDING &&
+                    lock->l_readers == 0 && lock->l_writers == 0)
                         continue;
 
                 if (lock->l_req_mode != mode)
@@ -666,6 +669,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
  *     server (ie, connh is NULL)
  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
  *     list will be considered
+ * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
+ *     to be canceled can still be matched as long as they still have reader
+ *     or writer refernces
  *
  * Returns 1 if it finds an already-existing lock that is compatible; in this
  * case, lockh is filled in with a addref()ed lock
@@ -710,14 +716,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
                 GOTO(out, rc = 1);
 
         EXIT;
      out:
+ out:
         ldlm_resource_putref(res);
         l_unlock(&ns->ns_lock);
 
         if (lock) {
                 ldlm_lock2handle(lock, lockh);
                 if (lock->l_completion_ast)
-                        lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, NULL);
+                        lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
+                                               NULL);
         }
         if (rc)
                 LDLM_DEBUG(lock, "matched");
@@ -734,7 +741,9 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                                    struct lustre_handle *parent_lock_handle,
                                    struct ldlm_res_id res_id, __u32 type,
-                                   ldlm_mode_t mode, void *data, void *cp_data)
+                                   ldlm_mode_t mode,
+                                   ldlm_blocking_callback blocking,
+                                   void *data)
 {
         struct ldlm_resource *res, *parent_res = NULL;
         struct ldlm_lock *lock, *parent_lock = NULL;
@@ -760,7 +769,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
 
         lock->l_req_mode = mode;
         lock->l_data = data;
-        lock->l_cp_data = cp_data;
+        lock->l_blocking_ast = blocking;
 
         RETURN(lock);
 }
@@ -769,8 +778,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                                struct ldlm_lock **lockp,
                                void *cookie, int cookie_len,
                                int *flags,
-                               ldlm_completion_callback completion,
-                               ldlm_blocking_callback blocking)
+                               ldlm_completion_callback completion)
 {
         struct ldlm_resource *res;
         struct ldlm_lock *lock = *lockp;
@@ -779,7 +787,6 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
         ENTRY;
 
         res = lock->l_resource;
-        lock->l_blocking_ast = blocking;
 
         if (res->lr_type == LDLM_EXTENT)
                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
@@ -867,12 +874,6 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                 *flags |= LDLM_FL_BLOCK_GRANTED;
                 GOTO(out, ELDLM_OK);
         }
-
-        if (lock->l_granted_cb != NULL && lock->l_data != NULL) {
-                /* We just -know- */
-                struct ptlrpc_request *req = lock->l_data;
-                lock->l_granted_cb(lock, req->rq_repmsg, 0);
-        }
         ldlm_grant_lock(lock, NULL, 0);
         EXIT;
       out:
@@ -994,11 +995,14 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
                 lock->l_flags |= LDLM_FL_CANCEL;
-                if (lock->l_blocking_ast)
+                if (lock->l_blocking_ast) {
+                        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                         lock->l_blocking_ast(lock, NULL, lock->l_data,
                                              LDLM_CB_CANCELING);
-                else
+                        return;
+                } else {
                         LDLM_DEBUG(lock, "no blocking ast");
+                }
         }
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 }
@@ -1023,7 +1027,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
                 LBUG();
         }
 
-        ldlm_cancel_callback(lock);
+        ldlm_cancel_callback(lock); /* XXX FIXME bug 1030 */
 
         ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy(lock);
@@ -1031,7 +1035,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         EXIT;
 }
 
-int ldlm_lock_set_data(struct lustre_handle *lockh, void *data, void *cp_data)
+int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
 {
         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
         ENTRY;
@@ -1040,7 +1044,6 @@ int ldlm_lock_set_data(struct lustre_handle *lockh, void *data, void *cp_data)
                 RETURN(-EINVAL);
 
         lock->l_data = data;
-        lock->l_cp_data = cp_data;
 
         LDLM_LOCK_PUT(lock);
 
@@ -1118,6 +1121,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
 void ldlm_lock_dump(int level, struct ldlm_lock *lock)
 {
         char ver[128];
+        struct obd_device *obd;
 
         if (!((portal_debug | D_ERROR) & level))
                 return;
@@ -1136,13 +1140,21 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock)
 
         CDEBUG(level, "  -- Lock dump: %p (%s) (rc: %d)\n", lock, ver,
                atomic_read(&lock->l_refc));
-        if (lock->l_export && lock->l_export->exp_connection)
+        obd = class_conn2obd(lock->l_connh);
+        if (lock->l_export && lock->l_export->exp_connection) {
                 CDEBUG(level, "  Node: NID "LPX64" on %s (rhandle: "LPX64")\n",
                        lock->l_export->exp_connection->c_peer.peer_nid,
                        lock->l_export->exp_connection->c_peer.peer_ni->pni_name,
                        lock->l_remote_handle.cookie);
-        else
+        } else if (obd == NULL) {
                 CDEBUG(level, "  Node: local\n");
+        } else {
+                struct obd_import *imp = obd->u.cli.cl_import;
+                CDEBUG(level, "  Node: NID "LPX64" on %s (rhandle: "LPX64")\n",
+                       imp->imp_connection->c_peer.peer_nid,
+                       imp->imp_connection->c_peer.peer_ni->pni_name,
+                       lock->l_remote_handle.cookie);
+        }
         CDEBUG(level, "  Parent: %p\n", lock->l_parent);
         CDEBUG(level, "  Resource: %p ("LPD64")\n", lock->l_resource,
                lock->l_resource->lr_name.name[0]);