Whamcloud - gitweb
b=1971
authorphil <phil>
Wed, 17 Sep 2003 19:17:26 +0000 (19:17 +0000)
committerphil <phil>
Wed, 17 Sep 2003 19:17:26 +0000 (19:17 +0000)
backport completion_ast fixes and lustre_dlm.h header updates from b_devel

lustre/include/linux/lustre_dlm.h
lustre/ldlm/l_lock.c
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c

index a0aae86..8f8fe8d 100644 (file)
@@ -37,35 +37,42 @@ typedef enum {
 #define LDLM_NAMESPACE_SERVER 0
 #define LDLM_NAMESPACE_CLIENT 1
 
-#define LDLM_FL_LOCK_CHANGED   (1 << 0) /* extent, mode, or resource changed */
+#define LDLM_FL_LOCK_CHANGED   0x000001 /* extent, mode, or resource changed */
 
 /* If the server returns one of these flags, then the lock was put on that list.
  * If the client sends one of these flags (during recovery ONLY!), it wants the
  * lock added to the specified list, no questions asked. -p */
-#define LDLM_FL_BLOCK_GRANTED  (1 << 1)
-#define LDLM_FL_BLOCK_CONV     (1 << 2)
-#define LDLM_FL_BLOCK_WAIT     (1 << 3)
+#define LDLM_FL_BLOCK_GRANTED  0x000002
+#define LDLM_FL_BLOCK_CONV     0x000004
+#define LDLM_FL_BLOCK_WAIT     0x000008
 
-#define LDLM_FL_CBPENDING      (1 << 4) // this lock is being destroyed
-#define LDLM_FL_AST_SENT       (1 << 5) // blocking or cancel packet was sent
-#define LDLM_FL_WAIT_NOREPROC  (1 << 6)// not a real lock flag,not saved in lock
-#define LDLM_FL_CANCEL         (1 << 7) // cancellation callback already run
+#define LDLM_FL_CBPENDING      0x000010 /* this lock is being destroyed */
+#define LDLM_FL_AST_SENT       0x000020 /* blocking or cancel packet was sent */
+#define LDLM_FL_WAIT_NOREPROC  0x000040 /* not a real flag, not saved in lock */
+#define LDLM_FL_CANCEL         0x000080 /* cancellation callback already run */
 
 /* Lock is being replayed.  This could probably be implied by the fact that one
  * of BLOCK_{GRANTED,CONV,WAIT} is set, but that is pretty dangerous. */
-#define LDLM_FL_REPLAY         (1 << 8)
+#define LDLM_FL_REPLAY         0x000100
 
-#define LDLM_FL_INTENT_ONLY    (1 << 9) /* don't grant lock, just do intent */
-#define LDLM_FL_LOCAL_ONLY     (1 << 10) /* see ldlm_cli_cancel_unused */
+#define LDLM_FL_INTENT_ONLY    0x000200 /* don't grant lock, just do intent */
+#define LDLM_FL_LOCAL_ONLY     0x000400 /* see ldlm_cli_cancel_unused */
 
 /* don't run the cancel callback under ldlm_cli_cancel_unused */
-#define LDLM_FL_NO_CALLBACK    (1 << 11)
+#define LDLM_FL_NO_CALLBACK    0x000800
 
-#define LDLM_FL_HAS_INTENT     (1 << 12) /* lock request has intent */
-#define LDLM_FL_CANCELING      (1 << 13) /* lock cancel has already been sent */
-#define LDLM_FL_LOCAL          (1 << 14) // a local lock (ie, no srv/cli split)
-#define LDLM_FL_WARN           (1 << 15) /* see ldlm_cli_cancel_unused */
-#define LDLM_FL_MATCH_DATA     (1 << 16) /* see ldlm_lock_match */
+#define LDLM_FL_HAS_INTENT     0x001000 /* lock request has intent */
+#define LDLM_FL_CANCELING      0x002000 /* lock cancel has already been sent */
+#define LDLM_FL_LOCAL          0x004000 /* local lock (ie, no srv/cli split) */
+#define LDLM_FL_WARN           0x008000 /* see ldlm_cli_cancel_unused */
+#define LDLM_FL_DISCARD_DATA   0x010000 /* discard (no writeback) on cancel */
+#define LDLM_FL_MATCH_DATA     0x020000 /* see ldlm_lock_match */
+
+/* These are flags that are mapped into the flags and ASTs of blocking locks */
+#define LDLM_AST_DISCARD_DATA  0x80000000 /* Add FL_DISCARD to blocking ASTs */
+
+/* Flags sent in AST lock_flags to be mapped into the receiving lock. */
+#define LDLM_AST_FLAGS         (LDLM_FL_DISCARD_DATA)
 
 /* The blocking callback is overloaded to perform two functions.  These flags
  * indicate which operation should be performed. */
@@ -248,20 +255,20 @@ do {                                                                          \
         if (lock->l_resource == NULL) {                                       \
                 CDEBUG(level, "### " format                                   \
                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "\
-                       "res: \?\? rrc=\?\? type: \?\?\? remote: "             \
+                       "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: "   \
                        LPX64"\n" , ## a, lock, lock->l_handle.h_cookie,       \
                        atomic_read(&lock->l_refc),                            \
                        lock->l_readers, lock->l_writers,                      \
                        ldlm_lockname[lock->l_granted_mode],                   \
                        ldlm_lockname[lock->l_req_mode],                       \
-                       lock->l_remote_handle.cookie);                         \
+                       lock->l_flags, lock->l_remote_handle.cookie);          \
                 break;                                                        \
         }                                                                     \
         if (lock->l_resource->lr_type == LDLM_EXTENT) {                       \
                 CDEBUG(level, "### " format                                   \
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64\
-                       "] remote: "LPX64"\n" , ## a,                          \
+                       "] flags: %x remote: "LPX64"\n" , ## a,                \
                        lock->l_resource->lr_namespace->ns_name, lock,         \
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),   \
                        lock->l_readers, lock->l_writers,                      \
@@ -272,14 +279,14 @@ do {                                                                          \
                        atomic_read(&lock->l_resource->lr_refcount),           \
                        ldlm_typename[lock->l_resource->lr_type],              \
                        lock->l_extent.start, lock->l_extent.end,              \
-                       lock->l_remote_handle.cookie);                         \
+                       lock->l_flags, lock->l_remote_handle.cookie);          \
                 break;                                                        \
         }                                                                     \
         {                                                                     \
                 CDEBUG(level, "### " format                                   \
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
-                       "res: "LPU64"/"LPU64" rrc: %d type: %s remote: "LPX64  \
-                       "\n" , ## a,                                           \
+                       "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x "     \
+                       "remote: "LPX64"\n" , ## a,                            \
                        lock->l_resource->lr_namespace->ns_name,               \
                        lock, lock->l_handle.h_cookie,                         \
                        atomic_read (&lock->l_refc),                           \
@@ -290,7 +297,7 @@ do {                                                                          \
                        lock->l_resource->lr_name.name[1],                     \
                        atomic_read(&lock->l_resource->lr_refcount),           \
                        ldlm_typename[lock->l_resource->lr_type],              \
-                       lock->l_remote_handle.cookie);                         \
+                       lock->l_flags, lock->l_remote_handle.cookie);          \
         }                                                                     \
 } while (0)
 
@@ -306,7 +313,7 @@ do {                                                                          \
  */
 
 #define LDLM_ITER_CONTINUE 1 /* keep iterating */
-#define LDLM_ITER_STOP     0 /* stop iterating */
+#define LDLM_ITER_STOP     2 /* stop iterating */
 
 typedef int (*ldlm_iterator_t)(struct ldlm_lock *, void *);
 typedef int (*ldlm_res_iterator_t)(struct ldlm_resource *, void *);
@@ -374,18 +381,9 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
-void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen);
 int ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct ldlm_res_id *,
                     __u32 type, void *cookie, int cookielen, ldlm_mode_t mode,
                     void *data, struct lustre_handle *);
-struct ldlm_lock *
-ldlm_lock_create(struct ldlm_namespace *ns,
-                 struct lustre_handle *parent_lock_handle, struct ldlm_res_id,
-                 __u32 type, ldlm_mode_t, ldlm_blocking_callback,
-                 void *data);
-ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
-                               void *cookie, int cookie_len, int *flags,
-                               ldlm_completion_callback completion);
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         int *flags);
 void ldlm_lock_cancel(struct ldlm_lock *lock);
index c439eed..ebf49d6 100644 (file)
@@ -46,7 +46,6 @@
 #endif
 
 #include <linux/lustre_dlm.h>
-#include <linux/obd_class.h>
 #include <linux/lustre_lib.h>
 
 /* invariants:
index f6a9f5e..08ee7fa 100644 (file)
@@ -40,6 +40,8 @@ int ldlm_extent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
         RETURN(1);
 }
 
+#include "ldlm_internal.h"
+
 /* The purpose of this function is to return:
  * - the maximum extent
  * - containing the requested extent
index b8bfdac..5aca55f 100644 (file)
@@ -1 +1,13 @@
+/* ldlm_request.c */
 int ldlm_cancel_lru(struct ldlm_namespace *ns);
+
+/* ldlm_lock.c */
+void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
+                    int run_ast);
+struct ldlm_lock *
+ldlm_lock_create(struct ldlm_namespace *ns,
+                 struct lustre_handle *parent_lock_handle, struct ldlm_res_id,
+                 __u32 type, ldlm_mode_t, ldlm_blocking_callback,
+                 ldlm_completion_callback, void *data);
+ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
+                               void *cookie, int cookie_len, int *flags);
index 4b05655..a1b5faa 100644 (file)
@@ -27,7 +27,6 @@
 # include <linux/slab.h>
 # include <linux/module.h>
 # include <linux/lustre_dlm.h>
-# include <linux/lustre_mds.h>
 #else
 # include <liblustre.h>
 # include <linux/kp30.h>
@@ -410,6 +409,8 @@ static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
         if (new && (lock->l_flags & LDLM_FL_AST_SENT))
                 GOTO(out, 0);
 
+        CDEBUG(D_OTHER, "lock %p incompatible; sending blocking AST.\n", lock);
+
         OBD_ALLOC(w, sizeof(*w));
         if (!w) {
                 LBUG();
@@ -565,11 +566,8 @@ static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
 
                 rc = 0;
 
-                if (send_cbs && child->l_blocking_ast != NULL) {
-                        CDEBUG(D_OTHER, "lock %p incompatible; sending "
-                               "blocking AST.\n", child);
+                if (send_cbs && child->l_blocking_ast != NULL)
                         ldlm_add_ast_work_item(child, lock, NULL, 0);
-                }
         }
 
         return rc;
@@ -597,7 +595,8 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
  *  - ldlm_reprocess_queue
  *  - ldlm_lock_convert
  */
-void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen)
+void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
+                     int run_ast)
 {
         struct ldlm_resource *res = lock->l_resource;
         ENTRY;
@@ -609,7 +608,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen)
         if (lock->l_granted_mode < res->lr_most_restr)
                 res->lr_most_restr = lock->l_granted_mode;
 
-        if (lock->l_completion_ast != NULL)
+        if (run_ast && lock->l_completion_ast != NULL)
                 ldlm_add_ast_work_item(lock, NULL, data, datalen);
 
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
@@ -764,6 +763,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                                    struct ldlm_res_id res_id, __u32 type,
                                    ldlm_mode_t mode,
                                    ldlm_blocking_callback blocking,
+                                   ldlm_completion_callback completion,
                                    void *data)
 {
         struct ldlm_resource *res, *parent_res = NULL;
@@ -791,15 +791,14 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
         lock->l_req_mode = mode;
         lock->l_data = data;
         lock->l_blocking_ast = blocking;
+        lock->l_completion_ast = completion;
 
         RETURN(lock);
 }
 
 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                                struct ldlm_lock **lockp,
-                               void *cookie, int cookie_len,
-                               int *flags,
-                               ldlm_completion_callback completion)
+                               void *cookie, int cookie_len, int *flags)
 {
         struct ldlm_resource *res;
         struct ldlm_lock *lock = *lockp;
@@ -863,7 +862,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                 else
-                        ldlm_grant_lock(lock, NULL, 0);
+                        ldlm_grant_lock(lock, NULL, 0, 0);
                 GOTO(out, ELDLM_OK);
         } else if (*flags & LDLM_FL_REPLAY) {
                 if (*flags & LDLM_FL_BLOCK_CONV) {
@@ -873,7 +872,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                         GOTO(out, ELDLM_OK);
                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
-                        ldlm_grant_lock(lock, NULL, 0);
+                        ldlm_grant_lock(lock, NULL, 0, 0);
                         GOTO(out, ELDLM_OK);
                 }
                 /* If no flags, fall through to normal enqueue path. */
@@ -895,12 +894,9 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                 *flags |= LDLM_FL_BLOCK_GRANTED;
                 GOTO(out, ELDLM_OK);
         }
-        ldlm_grant_lock(lock, NULL, 0);
+        ldlm_grant_lock(lock, NULL, 0, 0);
         EXIT;
       out:
-        /* Don't set 'completion_ast' until here so that if the lock is granted
-         * immediately we don't do an unnecessary completion call. */
-        lock->l_completion_ast = completion;
         l_unlock(&ns->ns_lock);
         return ELDLM_OK;
 }
@@ -922,7 +918,7 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res,
                         RETURN(1);
 
                 list_del_init(&pending->l_res_link);
-                ldlm_grant_lock(pending, NULL, 0);
+                ldlm_grant_lock(pending, NULL, 0, 1);
         }
 
         RETURN(0);
@@ -1034,6 +1030,9 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         struct ldlm_namespace *ns;
         ENTRY;
 
+        /* There's no race between calling this and taking the ns lock below;
+         * a lock can only be put on the waiting list once, because it can only
+         * issue a blocking AST once. */
         ldlm_del_waiting_lock(lock);
 
         res = lock->l_resource;
@@ -1118,7 +1117,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                         LBUG();
 
                         res->lr_tmp = &rpc_list;
-                        ldlm_grant_lock(lock, NULL, 0);
+                        ldlm_grant_lock(lock, NULL, 0, 0);
                         res->lr_tmp = NULL;
                         granted = 1;
                         /* FIXME: completion handling not with ns_lock held ! */
index 016a471..9350539 100644 (file)
@@ -21,7 +21,9 @@
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
-#define EXPORT_SYMTAB
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
@@ -35,6 +37,8 @@
 
 #include <linux/lustre_dlm.h>
 #include <linux/obd_class.h>
+#include "ldlm_internal.h"
+
 extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 extern struct lustre_lock ldlm_handle_lock;
@@ -454,7 +458,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                 dlm_req->lock_desc.l_resource.lr_name,
                                 dlm_req->lock_desc.l_resource.lr_type,
                                 dlm_req->lock_desc.l_req_mode,
-                                blocking_callback, NULL);
+                                blocking_callback, completion_callback, NULL);
         if (!lock)
                 GOTO(out, err = -ENOMEM);
 
@@ -471,7 +475,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
-                                &flags, completion_callback);
+                                &flags);
         if (err)
                 GOTO(out, err);
 
@@ -665,7 +669,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 LDLM_DEBUG(lock, "completion AST, new resource");
         }
         lock->l_resource->lr_tmp = &ast_list;
-        ldlm_grant_lock(lock, req, sizeof(*req));
+        ldlm_grant_lock(lock, req, sizeof(*req), 1);
         lock->l_resource->lr_tmp = NULL;
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
         l_unlock(&ns->ns_lock);
index aac4213..4c21bc6 100644 (file)
@@ -29,6 +29,8 @@
 #include <linux/obd_class.h>
 #include <linux/obd.h>
 
+#include "ldlm_internal.h"
+
 static void interrupted_completion_wait(void *data)
 {
 }
@@ -146,7 +148,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         }
 
         lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
-                                blocking, data);
+                                blocking, completion, data);
         if (!lock)
                 GOTO(out_nolock, err = -ENOMEM);
         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
@@ -155,8 +157,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         ldlm_lock2handle(lock, lockh);
         lock->l_flags |= LDLM_FL_LOCAL;
 
-        err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
-                                completion);
+        err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -218,14 +219,9 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 LASSERT(connh == lock->l_connh);
         } else {
                 lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
-                                        mode, blocking, data);
+                                        mode, blocking, completion, data);
                 if (lock == NULL)
                         GOTO(out_nolock, rc = -ENOMEM);
-                /* ugh.  I set this early (instead of waiting for _enqueue)
-                 * because the completion AST might arrive early, and we need
-                 * (in just this one case) to run the completion_cb even if it
-                 * arrives before the reply. */
-                lock->l_completion_ast = completion;
                 LDLM_DEBUG(lock, "client-side enqueue START");
                 /* for the local lock, add the reference */
                 ldlm_lock_addref_internal(lock, mode);
@@ -344,11 +340,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         }
 
         if (!is_replay) {
-                l_lock(&ns->ns_lock);
-                lock->l_completion_ast = NULL;
-                rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
-                                       completion);
-                l_unlock(&ns->ns_lock);
+                rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
                 if (lock->l_completion_ast)
                         lock->l_completion_ast(lock, *flags, NULL);
         }
@@ -813,7 +805,6 @@ void ldlm_change_cbdata(struct ldlm_namespace *ns,
                        void *data)
 {
         struct ldlm_resource *res;
-        int rc = 0;
         ENTRY;
 
         if (ns == NULL) {
@@ -828,7 +819,7 @@ void ldlm_change_cbdata(struct ldlm_namespace *ns,
         }
 
         l_lock(&ns->ns_lock);
-        rc = ldlm_resource_foreach(res, iter, data);
+        ldlm_resource_foreach(res, iter, data);
         l_unlock(&ns->ns_lock);
         ldlm_resource_putref(res);
         EXIT;