Whamcloud - gitweb
Branch: HEAD
authorwangdi <wangdi>
Thu, 15 Sep 2005 17:19:21 +0000 (17:19 +0000)
committerwangdi <wangdi>
Thu, 15 Sep 2005 17:19:21 +0000 (17:19 +0000)
land posix flock patch of Don.

14 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lib.h
lustre/ldlm/l_lock.c
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/file.c
lustre/mds/handler.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/pack_generic.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index adfae7f..5894b24 100644 (file)
@@ -67,12 +67,15 @@ typedef enum {
 #define LDLM_FL_DISCARD_DATA   0x010000 /* discard (no writeback) on cancel */
 #define LDLM_FL_CONFIG_CHANGE  0x020000 /* see ldlm_cli_cancel_unused */
 
-#define LDLM_FL_NO_TIMEOUT     0x020000 /* Blocked by group lock - wait
+#define LDLM_FL_NO_TIMEOUT     0x040000 /* Blocked by group lock - wait
                                          * indefinitely */
 
 /* file & record locking */
-#define LDLM_FL_BLOCK_NOWAIT   0x040000 /* server told not to wait if blocked */
-#define LDLM_FL_TEST_LOCK      0x080000 /* return blocking lock */
+#define LDLM_FL_BLOCK_NOWAIT   0x080000 /* server told not to wait if blocked */
+#define LDLM_FL_TEST_LOCK      0x100000 /* return blocking lock */
+#define LDLM_FL_GET_BLOCKING   0x200000 /* return updated blocking proc info */
+#define LDLM_FL_DEADLOCK_CHK   0x400000 /* check for deadlock */
+#define LDLM_FL_DEADLOCK_DEL   0x800000 /* lock no longer blocked */
 
 /* These are flags that are mapped into the flags and ASTs of blocking locks */
 #define LDLM_AST_DISCARD_DATA  0x80000000 /* Add FL_DISCARD to blocking ASTs */
@@ -418,9 +421,9 @@ do {                                                                          \
         if (lock->l_resource->lr_type == LDLM_FLOCK) {                        \
                 CDEBUG(level, "### " format                                   \
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
-                       "res: "LPU64"/"LPU64"/"LPU64" rrc: %d type: %s pid: "  \
-                      LPU64" " "["LPU64"->"LPU64"] flags: %x remote: "LPX64  \
-                       " expref: %d pid: %u\n" , ## a,                        \
+                       "res: "LPU64"/"LPU64"/"LPU64" rrc: %d type: %s "       \
+                      "pid: "LPU64" nid: "LPU64" ["LPU64"->"LPU64"] "        \
+                       "flags: %x remote: "LPX64" expref: %d pid: %u\n", ## a,\
                        lock->l_resource->lr_namespace->ns_name, lock,         \
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),   \
                        lock->l_readers, lock->l_writers,                      \
@@ -432,6 +435,7 @@ do {                                                                          \
                        atomic_read(&lock->l_resource->lr_refcount),           \
                        ldlm_typename[lock->l_resource->lr_type],              \
                        lock->l_policy_data.l_flock.pid,                       \
+                       lock->l_policy_data.l_flock.nid,                       \
                        lock->l_policy_data.l_flock.start,                     \
                        lock->l_policy_data.l_flock.end,                       \
                        lock->l_flags, lock->l_remote_handle.cookie,           \
@@ -523,6 +527,7 @@ void ldlm_change_cbdata(struct ldlm_namespace *, struct ldlm_res_id *,
 
 /* ldlm_flock.c */
 int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data);
+int ldlm_handle_flock_deadlock_check(struct ptlrpc_request *req);
 
 /* ldlm_extent.c */
 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms);
@@ -683,6 +688,20 @@ static inline void check_res_locked(struct ldlm_resource *res)
         LASSERT_SPIN_LOCKED(&res->lr_lock);
 }
 
+static inline void lock_bitlock(struct ldlm_lock *lock)
+{
+        bit_spin_lock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags);
+        LASSERT(lock->l_pidb == 0);
+        lock->l_pidb = current->pid;
+}
+
+static inline void unlock_bitlock(struct ldlm_lock *lock)
+{
+        LASSERT(lock->l_pidb == current->pid);
+        lock->l_pidb = 0;
+        bit_spin_unlock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags);
+}
+
 struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock);
 void unlock_res_and_lock(struct ldlm_lock *lock);
 
index 3b0e1ea..052debc 100644 (file)
@@ -901,12 +901,13 @@ struct lmv_desc {
  */
 /* opcodes -- MUST be distinct from OST/MDS opcodes */
 typedef enum {
-        LDLM_ENQUEUE     = 101,
-        LDLM_CONVERT     = 102,
-        LDLM_CANCEL      = 103,
-        LDLM_BL_CALLBACK = 104,
-        LDLM_CP_CALLBACK = 105,
-        LDLM_GL_CALLBACK = 106,
+        LDLM_ENQUEUE          = 101,
+        LDLM_CONVERT          = 102,
+        LDLM_CANCEL           = 103,
+        LDLM_BL_CALLBACK      = 104,
+        LDLM_CP_CALLBACK      = 105,
+        LDLM_GL_CALLBACK      = 106,
+        LDLM_FLK_DEADLOCK_CHK = 107,
         LDLM_LAST_OPC
 } ldlm_cmd_t;
 #define LDLM_FIRST_OPC LDLM_ENQUEUE
@@ -942,8 +943,9 @@ struct ldlm_flock {
         __u64 start;
         __u64 end;
         __u64 pid;
+        __u64 nid;
         __u64 blocking_pid;
-        __u64 blocking_export;
+        __u64 blocking_nid;
 };
 
 /* it's important that the fields of the ldlm_extent structure match
index 3c9f081..bfb480b 100644 (file)
@@ -615,6 +615,22 @@ do {                                                                           \
             set_current_state(TASK_INTERRUPTIBLE);                             \
             if (condition)                                                     \
                     break;                                                     \
+            if (signal_pending(current)) {                                     \
+                    if (!info->lwi_timeout || __timed_out) {                   \
+                            break;                                             \
+                    } else {                                                   \
+                            /* We have to do this here because some signals */ \
+                            /* are not blockable - ie from strace(1).       */ \
+                            /* In these cases we want to schedule_timeout() */ \
+                            /* again, because we don't want that to return  */ \
+                            /* -EINTR when the RPC actually succeeded.      */ \
+                            /* the RECALC_SIGPENDING below will deliver the */ \
+                            /* signal properly.                             */ \
+                            SIGNAL_MASK_LOCK(current, irqflags);               \
+                            CLEAR_SIGPENDING;                                  \
+                            SIGNAL_MASK_UNLOCK(current, irqflags);             \
+                    }                                                          \
+            }                                                                  \
             if (info->lwi_timeout && !__timed_out) {                           \
                 timeout_remaining = schedule_timeout(timeout_remaining);       \
                 if (timeout_remaining == 0) {                                  \
@@ -631,24 +647,6 @@ do {                                                                           \
             } else {                                                           \
                 schedule();                                                    \
             }                                                                  \
-            if (condition)                                                     \
-                    break;                                                     \
-            if (signal_pending(current)) {                                     \
-                    if (__timed_out) {                                         \
-                            break;                                             \
-                    } else {                                                   \
-                            /* We have to do this here because some signals */ \
-                            /* are not blockable - ie from strace(1).       */ \
-                            /* In these cases we want to schedule_timeout() */ \
-                            /* again, because we don't want that to return  */ \
-                            /* -EINTR when the RPC actually succeeded.      */ \
-                            /* the RECALC_SIGPENDING below will deliver the */ \
-                            /* signal properly.                             */ \
-                            SIGNAL_MASK_LOCK(current, irqflags);               \
-                            CLEAR_SIGPENDING;                                  \
-                            SIGNAL_MASK_UNLOCK(current, irqflags);             \
-                    }                                                          \
-            }                                                                  \
         }                                                                      \
                                                                                \
         SIGNAL_MASK_LOCK(current, irqflags);                                   \
@@ -656,7 +654,7 @@ do {                                                                           \
         RECALC_SIGPENDING;                                                     \
         SIGNAL_MASK_UNLOCK(current, irqflags);                                 \
                                                                                \
-        if (__timed_out && signal_pending(current)) {                          \
+        if ((!info->lwi_timeout || __timed_out) && signal_pending(current)) {  \
                 if (info->lwi_on_signal)                                       \
                         info->lwi_on_signal(info->lwi_cb_data);                \
                 ret = -EINTR;                                                  \
index fb41ccb..b652097 100644 (file)
@@ -64,21 +64,12 @@ struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock)
                 return res;
         } 
 
-        bit_spin_lock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags);
-        LASSERT(lock->l_pidb == 0);
+        lock_bitlock(lock);
         res = lock->l_resource;
-        lock->l_pidb = current->pid;
         lock_res(res);
         return res;
 }
 
-void unlock_bitlock(struct ldlm_lock *lock)
-{
-        LASSERT(lock->l_pidb == current->pid);
-        lock->l_pidb = 0;
-        bit_spin_unlock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags);
-}
-
 void unlock_res_and_lock(struct ldlm_lock *lock)
 {
         struct ldlm_resource *res = lock->l_resource;
index a86c021..cbeacfa 100644 (file)
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
+/*
+ * 2003 - 2005 Copyright, Hewlett-Packard Development Compnay, LP.
+ *
+ * Developed under the sponsorship of the U.S. Government
+ *     under Subcontract No. B514193
+ */
+
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
 
 #include "ldlm_internal.h"
 
-#define l_flock_waitq   l_lru
-
 static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq);
-
-int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                            void *data, int flag);
+static int ldlm_deadlock_timeout = 30 * HZ;
 
 /**
  * list_for_remaining_safe - iterate over the remaining entries in a list
@@ -58,7 +61,8 @@ ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
 {
         return((new->l_policy_data.l_flock.pid ==
                 lock->l_policy_data.l_flock.pid) &&
-               (new->l_export == lock->l_export));
+               (new->l_policy_data.l_flock.nid ==
+                lock->l_policy_data.l_flock.nid));
 }
 
 static inline int
@@ -78,10 +82,12 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
                    mode, flags);
 
-        LASSERT(list_empty(&lock->l_flock_waitq));
+        /* don't need to take the locks here because the lock
+         * is on a local destroy list, not the resource list. */
         list_del_init(&lock->l_res_link);
+
         if (flags == LDLM_FL_WAIT_NOREPROC) {
-                /* client side - set a flag to prevent sending a CANCEL */
+                /* client side - set flags to prevent sending a CANCEL */
                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
                 ldlm_lock_decref_internal(lock, mode);
         }
@@ -90,73 +96,44 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
         EXIT;
 }
 
-static int
-ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
-{
-        struct obd_export *req_export = req->l_export;
-        struct obd_export *blocking_export = blocking_lock->l_export;
-        pid_t req_pid = req->l_policy_data.l_flock.pid;
-        pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
-        struct ldlm_lock *lock;
-
-restart:
-        list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
-                if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
-                    (lock->l_export != blocking_export))
-                        continue;
-
-                blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
-                blocking_export = (struct obd_export *)(long)
-                        lock->l_policy_data.l_flock.blocking_export;
-                if (blocking_pid == req_pid && blocking_export == req_export)
-                        return 1;
-
-                goto restart;
-        }
-
-        return 0;
-}
-
 int
 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         ldlm_error_t *err, struct list_head *work_list)
 {
+        struct list_head destroy_list = LIST_HEAD_INIT(destroy_list);
         struct ldlm_resource *res = req->l_resource;
         struct ldlm_namespace *ns = res->lr_namespace;
-        struct list_head *tmp;
-        struct list_head *ownlocks = NULL;
-        struct ldlm_lock *lock = NULL;
+        struct list_head *pos;
+        struct list_head *tmp = NULL;
+        struct ldlm_lock *lock;
         struct ldlm_lock *new = req;
-        struct ldlm_lock *new2 = NULL;
+        struct ldlm_lock *new2;
         ldlm_mode_t mode = req->l_req_mode;
-        int local = ns->ns_client;
         int added = (mode == LCK_NL);
         int overlaps = 0;
+        int rc = LDLM_ITER_CONTINUE;
+        int i = 0;
         ENTRY;
 
-        CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "
-               LPU64"\n", *flags, (unsigned int)new->l_policy_data.l_flock.pid, 
-              mode, req->l_policy_data.l_flock.start,
+        CDEBUG(D_DLMTRACE, "flags %#x mode %u pid "LPU64" nid "LPU64" "
+               "start "LPU64" end "LPU64"\n", *flags, mode,
+               req->l_policy_data.l_flock.pid, 
+               req->l_policy_data.l_flock.nid, 
+              req->l_policy_data.l_flock.start,
                req->l_policy_data.l_flock.end);
 
         *err = ELDLM_OK;
 
-        if (local) {
-                /* No blocking ASTs are sent to the clients for
-                 * Posix file & record locks */
-                req->l_blocking_ast = NULL;
-        } else {
-                /* Called on the server for lock cancels. */
-                req->l_blocking_ast = ldlm_flock_blocking_ast;
-        }
+        /* No blocking ASTs are sent for Posix file & record locks */
+        req->l_blocking_ast = NULL;
 
         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
                 /* This loop determines where this processes locks start
                  * in the resource lr_granted list. */
-                list_for_each(tmp, &res->lr_granted) {
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                list_for_each(pos, &res->lr_granted) {
+                        lock = list_entry(pos, struct ldlm_lock, l_res_link);
                         if (ldlm_same_flock_owner(lock, req)) {
-                                ownlocks = tmp;
+                                tmp = pos;
                                 break;
                         }
                 }
@@ -165,12 +142,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
 
                 /* This loop determines if there are existing locks
                  * that conflict with the new lock request. */
-                list_for_each(tmp, &res->lr_granted) {
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                list_for_each(pos, &res->lr_granted) {
+                        lock = list_entry(pos, struct ldlm_lock, l_res_link);
 
                         if (ldlm_same_flock_owner(lock, req)) {
-                                if (!ownlocks)
-                                        ownlocks = tmp;
+                                if (!tmp)
+                                        tmp = pos;
                                 continue;
                         }
 
@@ -181,42 +158,39 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         if (!ldlm_flocks_overlap(lock, req))
                                 continue;
 
+                        /* deadlock detection will be done will be postponed
+                         * until ldlm_flock_completion_ast(). */
+
+                        *flags |= LDLM_FL_LOCK_CHANGED;
+
+                        req->l_policy_data.l_flock.blocking_pid =
+                                lock->l_policy_data.l_flock.pid;
+                        req->l_policy_data.l_flock.blocking_nid =
+                                lock->l_policy_data.l_flock.nid;
+
                         if (!first_enq)
                                 RETURN(LDLM_ITER_CONTINUE);
 
                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                ldlm_flock_destroy(req, mode, *flags);
+                                list_move(&req->l_res_link, &destroy_list);
                                 *err = -EAGAIN;
-                                RETURN(LDLM_ITER_STOP);
+                                GOTO(out, rc = LDLM_ITER_STOP);
                         }
 
                         if (*flags & LDLM_FL_TEST_LOCK) {
-                                ldlm_flock_destroy(req, mode, *flags);
                                 req->l_req_mode = lock->l_granted_mode;
                                 req->l_policy_data.l_flock.pid =
                                         lock->l_policy_data.l_flock.pid;
+                                req->l_policy_data.l_flock.nid =
+                                        lock->l_policy_data.l_flock.nid;
                                 req->l_policy_data.l_flock.start =
                                         lock->l_policy_data.l_flock.start;
                                 req->l_policy_data.l_flock.end =
                                         lock->l_policy_data.l_flock.end;
-                                *flags |= LDLM_FL_LOCK_CHANGED;
-                                RETURN(LDLM_ITER_STOP);
+                                list_move(&req->l_res_link, &destroy_list);
+                                GOTO(out, rc = LDLM_ITER_STOP);
                         }
 
-                        if (ldlm_flock_deadlock(req, lock)) {
-                                ldlm_flock_destroy(req, mode, *flags);
-                                *err = -EDEADLK;
-                                RETURN(LDLM_ITER_STOP);
-                        }
-
-                        req->l_policy_data.l_flock.blocking_pid =
-                                lock->l_policy_data.l_flock.pid;
-                        req->l_policy_data.l_flock.blocking_export =
-                                (long)(void *)lock->l_export;
-
-                        LASSERT(list_empty(&req->l_flock_waitq));
-                        list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
-
                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
                         *flags |= LDLM_FL_BLOCK_GRANTED;
                         RETURN(LDLM_ITER_STOP);
@@ -224,24 +198,18 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
         }
 
         if (*flags & LDLM_FL_TEST_LOCK) {
-                ldlm_flock_destroy(req, mode, *flags);
                 req->l_req_mode = LCK_NL;
                 *flags |= LDLM_FL_LOCK_CHANGED;
-                RETURN(LDLM_ITER_STOP);
+                list_move(&req->l_res_link, &destroy_list);
+                GOTO(out, rc = LDLM_ITER_STOP);
         }
 
-        /* In case we had slept on this lock request take it off of the
-         * deadlock detection waitq. */
-        list_del_init(&req->l_flock_waitq);
-
         /* Scan the locks owned by this process that overlap this request.
          * We may have to merge or split existing locks. */
+        pos = (tmp != NULL) ? tmp : &res->lr_granted;
 
-        if (!ownlocks)
-                ownlocks = &res->lr_granted;
-
-        list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
-                lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
+        list_for_remaining_safe(pos, tmp, &res->lr_granted) {
+                lock = list_entry(pos, struct ldlm_lock, l_res_link);
 
                 if (!ldlm_same_flock_owner(lock, new))
                         break;
@@ -280,7 +248,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         }
 
                         if (added) {
-                                ldlm_flock_destroy(lock, mode, *flags);
+                                list_move(&lock->l_res_link, &destroy_list);
                         } else {
                                 new = lock;
                                 added = 1;
@@ -306,7 +274,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                         new->l_policy_data.l_flock.end + 1;
                                 break;
                         }
-                        ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
+                        list_move(&lock->l_res_link, &destroy_list);
                         continue;
                 }
                 if (new->l_policy_data.l_flock.end >=
@@ -332,14 +300,16 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                         lock->l_granted_mode, NULL, NULL, NULL,
                                         NULL, 0);
                 if (!new2) {
-                        ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
+                        list_move(&req->l_res_link, &destroy_list);
                         *err = -ENOLCK;
-                        RETURN(LDLM_ITER_STOP);
+                        GOTO(out, rc = LDLM_ITER_STOP);
                 }
 
                 new2->l_granted_mode = lock->l_granted_mode;
                 new2->l_policy_data.l_flock.pid =
                         new->l_policy_data.l_flock.pid;
+                new2->l_policy_data.l_flock.nid =
+                        new->l_policy_data.l_flock.nid;
                 new2->l_policy_data.l_flock.start =
                         lock->l_policy_data.l_flock.start;
                 new2->l_policy_data.l_flock.end =
@@ -353,10 +323,11 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                  &new2->l_export->exp_ldlm_data.led_held_locks);
                 }
                 if (*flags == LDLM_FL_WAIT_NOREPROC)
-                        ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
+                        ldlm_lock_addref_internal_nolock(new2,
+                                                         lock->l_granted_mode);
 
                 /* insert new2 at lock */
-                ldlm_resource_add_lock(res, ownlocks, new2);
+                ldlm_resource_add_lock(res, pos, new2);
                 LDLM_LOCK_PUT(new2);
                 break;
         }
@@ -364,11 +335,14 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
         /* At this point we're granting the lock request. */
         req->l_granted_mode = req->l_req_mode;
 
-        /* Add req to the granted queue before calling ldlm_reprocess_all(). */
-        if (!added) {
+        if (added) {
+                list_move(&req->l_res_link, &destroy_list);
+        } else {
+                /* Add req to the granted queue before calling
+                 * ldlm_reprocess_all() below. */
                 list_del_init(&req->l_res_link);
-                /* insert new lock before ownlocks in list. */
-                ldlm_resource_add_lock(res, ownlocks, req);
+                /* insert new lock before pos in the list. */
+                ldlm_resource_add_lock(res, pos, req);
         }
 
         if (*flags != LDLM_FL_WAIT_NOREPROC) {
@@ -383,11 +357,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                          * but only once because first_enq will be false from
                          * ldlm_reprocess_queue. */
                         if ((mode == LCK_NL) && overlaps) {
-                                struct list_head rpc_list
-                                                    = LIST_HEAD_INIT(rpc_list);
+                                struct list_head rpc_list =
+                                                     LIST_HEAD_INIT(rpc_list);
                                 int rc;
-restart:
-                                ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
+ restart:
+                                ldlm_reprocess_queue(res, &res->lr_waiting,
+                                                     &rpc_list);
                                 unlock_res(res);
                                 rc = ldlm_run_cp_ast_work(&rpc_list);
                                 lock_res(res);
@@ -396,19 +371,274 @@ restart:
                        }
                 } else {
                         LASSERT(req->l_completion_ast);
-                        ldlm_add_ast_work_item(req, NULL, NULL);
+                        ldlm_add_ast_work_item(req, NULL, work_list);
+                }
+        }
+
+ out:
+        if (!list_empty(&destroy_list)) {
+                /* FIXME: major hack. when called from ldlm_lock_enqueue()
+                 * the res and the lock are locked. When called from
+                 * ldlm_reprocess_queue() the res is locked but the lock
+                 * is not. */
+                if (added && first_enq && res->lr_namespace->ns_client)
+                        unlock_bitlock(req);
+
+                unlock_res(res);
+
+                CDEBUG(D_DLMTRACE, "Destroy locks:\n");
+
+                list_for_each_safe(pos, tmp, &destroy_list) {
+                        lock = list_entry(pos, struct ldlm_lock, l_res_link);
+                        ldlm_lock_dump(D_DLMTRACE, lock, ++i);
+                        ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
+                }
+
+                if (added && first_enq && res->lr_namespace->ns_client)
+                        lock_bitlock(req);
+
+                lock_res(res);
+        }
+
+        RETURN(rc);
+}
+
+struct ldlm_sleep_flock {
+        __u64 lsf_pid;
+        __u64 lsf_nid;
+        __u64 lsf_blocking_pid;
+        __u64 lsf_blocking_nid;
+        struct list_head lsf_list;
+};
+
+int
+ldlm_handle_flock_deadlock_check(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        struct ldlm_sleep_flock *lsf;
+        struct list_head *pos;
+        __u64 pid, nid, blocking_pid, blocking_nid;
+        unsigned int flags;
+        int rc = 0;
+        ENTRY;
+
+        req->rq_status = 0;
+
+        dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
+                                      lustre_swab_ldlm_request);
+        if (dlm_req == NULL) {
+                CERROR("bad request buffer for flock deadlock check\n");
+                RETURN(-EFAULT);
+        }
+
+        flags = dlm_req->lock_flags;
+        pid = dlm_req->lock_desc.l_policy_data.l_flock.pid;
+        nid = dlm_req->lock_desc.l_policy_data.l_flock.nid;
+        blocking_pid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_pid;
+        blocking_nid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_nid;
+
+        CDEBUG(D_DLMTRACE, "flags: 0x%x req: pid: "LPU64" nid "LPU64" "
+               "blk: pid: "LPU64" nid: "LPU64"\n",
+               dlm_req->lock_flags, pid, nid, blocking_pid, blocking_nid);
+
+        if (flags & LDLM_FL_GET_BLOCKING) {
+                struct ldlm_lock *lock;
+                struct ldlm_reply *dlm_rep;
+                int size = sizeof(*dlm_rep);
+                
+                lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+                if (!lock) {
+                        CERROR("received deadlock check for unknown lock "
+                               "cookie "LPX64" from client %s id %s\n",
+                               dlm_req->lock_handle1.cookie,
+                               req->rq_export->exp_client_uuid.uuid,
+                               req->rq_peerstr);
+                        req->rq_status = -ESTALE;
+                        RETURN(0);
+                }
+
+                lock_res_and_lock(lock);
+                blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
+                blocking_nid = lock->l_policy_data.l_flock.blocking_nid;
+                unlock_res_and_lock(lock);
+
+                rc = lustre_pack_reply(req, 1, &size, NULL);
+                if (rc) {
+                        CERROR("lustre_pack_reply failed: rc = %d\n", rc);
+                        req->rq_status = rc;
+                        RETURN(0);
+                }
+
+                dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*dlm_rep));
+                dlm_rep->lock_desc.l_policy_data.l_flock.blocking_pid =
+                        blocking_pid;
+                dlm_rep->lock_desc.l_policy_data.l_flock.blocking_nid =
+                        blocking_nid;
+        } else {
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+        }
+
+        if (flags & LDLM_FL_DEADLOCK_CHK) {
+                __u64 orig_blocking_pid = blocking_pid;
+                __u64 orig_blocking_nid = blocking_nid;
+ restart:
+                list_for_each(pos, &ldlm_flock_waitq) {
+                        lsf = list_entry(pos,struct ldlm_sleep_flock,lsf_list);
+
+                        /* We want to return a deadlock condition for the
+                         * last lock on the waitq that created the deadlock
+                         * situation. Posix verification suites expect this
+                         * behavior. We'll stop if we haven't found a deadlock
+                         * up to the point where the current process is queued
+                         * to let the last lock on the queue that's in the
+                         * deadlock loop detect the deadlock. In this case
+                         * just update the blocking info.*/
+                        if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) {
+                                lsf->lsf_blocking_pid = blocking_pid;
+                                lsf->lsf_blocking_nid = blocking_nid;
+                                break;
+                        }
+
+                        if ((lsf->lsf_pid != blocking_pid) ||
+                            (lsf->lsf_nid != blocking_nid))
+                                continue;
+
+                        blocking_pid = lsf->lsf_blocking_pid;
+                        blocking_nid = lsf->lsf_blocking_nid;
+
+                        if (blocking_pid == pid && blocking_nid == nid){
+                                req->rq_status = -EDEADLOCK;
+                                flags |= LDLM_FL_DEADLOCK_DEL;
+                                break;
+                        }
+
+                        goto restart;
+                }
+
+                /* If we got all the way thru the list then we're not on it. */
+                if (pos == &ldlm_flock_waitq) {
+                        OBD_ALLOC(lsf, sizeof(*lsf));
+                        if (!lsf)
+                                RETURN(-ENOSPC);
+
+                        lsf->lsf_pid = pid;
+                        lsf->lsf_nid = nid;
+                        lsf->lsf_blocking_pid = orig_blocking_pid;
+                        lsf->lsf_blocking_nid = orig_blocking_nid;
+                        list_add_tail(&lsf->lsf_list, &ldlm_flock_waitq);
+                }
+        }
+        
+        if (flags & LDLM_FL_DEADLOCK_DEL) {
+                list_for_each_entry(lsf, &ldlm_flock_waitq, lsf_list) {
+                        if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) {
+                                list_del_init(&lsf->lsf_list);
+                                OBD_FREE(lsf, sizeof(*lsf));
+                                break;
+                        }
                 }
         }
 
-        /* In case we're reprocessing the requested lock we can't destroy
-         * it until after calling ldlm_ast_work_item() above so that lawi()
-         * can bump the reference count on req. Otherwise req could be freed
-         * before the completion AST can be sent.  */
-        if (added)
-                ldlm_flock_destroy(req, mode, *flags);
+        RETURN(rc);
+}
+
+int
+ldlm_send_flock_deadlock_check(struct obd_device *obd, struct ldlm_lock *lock,
+                               unsigned int flags)
+{
+        struct obd_import *imp;
+        struct ldlm_request *body;
+        struct ldlm_reply *reply;
+        struct ptlrpc_request *req;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        CDEBUG(D_DLMTRACE, "obd: %p flags: 0x%x\n", obd, flags);
+
+        imp = obd->u.cli.cl_import;
+        req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_FLK_DEADLOCK_CHK, 1,
+                              &size, NULL);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        body->lock_flags = flags;
+        ldlm_lock2desc(lock, &body->lock_desc);
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+
+        if (flags & LDLM_FL_GET_BLOCKING) {
+                size = sizeof(*reply);
+                req->rq_replen = lustre_msg_size(1, &size);
+        } else {
+                req->rq_replen = lustre_msg_size(0, NULL);
+        }
+
+        rc = ptlrpc_queue_wait(req);
+        if (rc != ELDLM_OK)
+                GOTO(out, rc);
+
+        if (flags & LDLM_FL_GET_BLOCKING) {
+                reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                                           lustre_swab_ldlm_reply);
+                if (reply == NULL) {
+                        CERROR ("Can't unpack ldlm_reply\n");
+                        GOTO (out, rc = -EPROTO);
+                }
+
+                lock->l_policy_data.l_flock.blocking_pid =
+                        reply->lock_desc.l_policy_data.l_flock.blocking_pid;
+                lock->l_policy_data.l_flock.blocking_nid =
+                        reply->lock_desc.l_policy_data.l_flock.blocking_nid;
+
+                CDEBUG(D_DLMTRACE, "LDLM_FL_GET_BLOCKING: pid: "LPU64" "
+                       "nid: "LPU64" blk: pid: "LPU64" nid: "LPU64"\n",
+                       lock->l_policy_data.l_flock.pid,
+                       lock->l_policy_data.l_flock.nid,
+                       lock->l_policy_data.l_flock.blocking_pid,
+                       lock->l_policy_data.l_flock.blocking_nid);
+        }
 
-        ldlm_resource_dump(D_OTHER, res);
-        RETURN(LDLM_ITER_CONTINUE);
+        rc = req->rq_status;
+ out:
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
+int
+ldlm_flock_deadlock_check(struct obd_device *master_obd, struct obd_device *obd,
+                          struct ldlm_lock *lock)
+{
+        unsigned int flags = 0;
+        int rc;
+        ENTRY;
+
+        if (obd == NULL) {
+                /* Delete this process from the sleeplock list. */
+                flags = LDLM_FL_DEADLOCK_DEL;
+                rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags);
+                RETURN(rc);
+        }
+
+        flags = LDLM_FL_GET_BLOCKING;
+        if (obd == master_obd)
+                flags |= LDLM_FL_DEADLOCK_CHK;
+
+        rc = ldlm_send_flock_deadlock_check(obd, lock, flags);
+        CDEBUG(D_DLMTRACE, "1st check: rc: %d flags: 0x%x\n", rc, flags);
+        if (rc || (flags & LDLM_FL_DEADLOCK_CHK))
+                RETURN(rc);
+
+        CDEBUG(D_DLMTRACE, "about to send 2nd check: master: %p.\n",
+                master_obd);
+
+        flags = LDLM_FL_DEADLOCK_CHK;
+
+        rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags);
+
+        CDEBUG(D_DLMTRACE, "2nd check: rc: %d flags: 0x%x\n", rc, flags);
+
+        RETURN(rc);
 }
 
 struct ldlm_flock_wait_data {
@@ -425,11 +655,10 @@ ldlm_flock_interrupted_wait(void *data)
 
         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
 
-        /* take lock off the deadlock detection waitq. */
-        list_del_init(&lock->l_flock_waitq);
-
         /* client side - set flag to prevent lock from being put on lru list */
+        lock_res_and_lock(lock);
         lock->l_flags |= LDLM_FL_CBPENDING;
+        unlock_res_and_lock(lock);
 
         ldlm_lock_decref_internal(lock, lock->l_req_mode);
         ldlm_lock2handle(lock, &lockh);
@@ -440,20 +669,17 @@ ldlm_flock_interrupted_wait(void *data)
 int
 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
-        struct ldlm_namespace *ns;
-        struct file_lock *getlk = lock->l_ast_data;
         struct ldlm_flock_wait_data fwd;
         unsigned long irqflags;
         struct obd_device *obd;
+        struct obd_device *master_obd = (struct obd_device *)lock->l_ast_data;
         struct obd_import *imp = NULL;
         ldlm_error_t err;
+        int deadlock_checked = 0;
         int rc = 0;
         struct l_wait_info lwi;
         ENTRY;
 
-        CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
-               flags, data, getlk);
-
         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
 
         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
@@ -467,6 +693,9 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         fwd.fwd_lock = lock;
         obd = class_exp2obd(lock->l_conn_export);
 
+        CDEBUG(D_DLMTRACE, "flags: 0x%x master: %p obd: %p\n",
+               flags, master_obd, obd);
+
         /* if this is a local lock, then there is no import */
         if (obd != NULL)
                 imp = obd->u.cli.cl_import;
@@ -477,46 +706,42 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 spin_unlock_irqrestore(&imp->imp_lock, irqflags);
         }
 
-        lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
+        lwi = LWI_TIMEOUT_INTR(ldlm_deadlock_timeout, NULL,
+                               ldlm_flock_interrupted_wait, &fwd);
 
-        /* Go to sleep until the lock is granted. */
+ restart:
         rc = l_wait_event(lock->l_waitq,
                           ((lock->l_req_mode == lock->l_granted_mode) ||
                            lock->l_destroyed), &lwi);
 
+        if (rc == -ETIMEDOUT) {
+                deadlock_checked = 1;
+                rc = ldlm_flock_deadlock_check(master_obd, obd, lock);
+                if (rc == -EDEADLK)
+                        ldlm_flock_interrupted_wait(&fwd);
+                else {
+                        CDEBUG(D_DLMTRACE, "lock: %p going back to sleep,\n",
+                               lock);
+                        goto restart;
+                }
+        } else {
+                if (deadlock_checked)
+                        ldlm_flock_deadlock_check(master_obd, NULL, lock);
+        }
+
         LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);
         RETURN(rc);
  
-granted:
-
+ granted:
         LDLM_DEBUG(lock, "client-side enqueue granted");
-        ns = lock->l_resource->lr_namespace;
-        lock_res(lock->l_resource);
-
-        /* take lock off the deadlock detection waitq. */
-        list_del_init(&lock->l_flock_waitq);
+        lock_res_and_lock(lock);
 
         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
         list_del_init(&lock->l_res_link);
 
         if (flags & LDLM_FL_TEST_LOCK) {
-                /* fcntl(F_GETLK) request */
-                /* The old mode was saved in getlk->fl_type so that if the mode
-                 * in the lock changes we can decref the approprate refcount. */
-                ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
-                switch (lock->l_granted_mode) {
-                case LCK_PR:
-                        getlk->fl_type = F_RDLCK;
-                        break;
-                case LCK_PW:
-                        getlk->fl_type = F_WRLCK;
-                        break;
-                default:
-                        getlk->fl_type = F_UNLCK;
-                }
-                getlk->fl_pid = lock->l_policy_data.l_flock.pid;
-                getlk->fl_start = lock->l_policy_data.l_flock.start;
-                getlk->fl_end = lock->l_policy_data.l_flock.end;
+                /* client side - set flag to prevent sending a CANCEL */
+                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
         } else {
                 int noreproc = LDLM_FL_WAIT_NOREPROC;
 
@@ -526,21 +751,7 @@ granted:
                 if (flags == 0)
                         wake_up(&lock->l_waitq);
         }
-        unlock_res(lock->l_resource);
-        RETURN(0);
-}
 
-int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                            void *data, int flag)
-{
-        ENTRY;
-                                                                                                                             
-        LASSERT(lock);
-        LASSERT(flag == LDLM_CB_CANCELING);
-                                                                                                                             
-        /* take lock off the deadlock detection waitq. */
-        lock_res(lock->l_resource);
-        list_del_init(&lock->l_flock_waitq);
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         RETURN(0);
 }
index 13d3be3..48b219d 100644 (file)
@@ -291,8 +291,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         RETURN(lock);
 }
 
-void unlock_bitlock(struct ldlm_lock *lock);
-
 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
                               struct ldlm_res_id new_resid)
 {
index 4e32bd4..cfd8c97 100644 (file)
@@ -1054,6 +1054,7 @@ static int ldlm_msg_check_version(struct lustre_msg *msg)
         case LDLM_BL_CALLBACK:
         case LDLM_CP_CALLBACK:
         case LDLM_GL_CALLBACK:
+        case LDLM_FLK_DEADLOCK_CHK:
                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
@@ -1572,6 +1573,7 @@ void __exit ldlm_exit(void)
 
 /* ldlm_flock.c */
 EXPORT_SYMBOL(ldlm_flock_completion_ast);
+EXPORT_SYMBOL(ldlm_handle_flock_deadlock_check);
 
 /* ldlm_extent.c */
 EXPORT_SYMBOL(ldlm_extent_shift_kms);
index f595a9b..7a5e367 100644 (file)
@@ -424,11 +424,13 @@ int ldlm_cli_enqueue(struct obd_export *exp,
         if (!is_replay) {
                 rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
                 if (lock->l_completion_ast != NULL) {
+                        /* since the lock made it to the server at this point
+                         * it's the completion AST's responsibilty to cleanup 
+                         * the lock if the completion processing fails since
+                         * it's no longer a simple local lock cancel. */
                         int err = lock->l_completion_ast(lock, *flags, NULL);
                         if (!rc)
                                 rc = err;
-                        if (rc)
-                                cleanup_phase = 2;
                 }
         }
 
index c993a2e..94ba1db 100644 (file)
@@ -34,6 +34,7 @@
 #endif
 #include <linux/obd_lov.h>
 #include <linux/lustre_audit.h>
+#include <linux/lustre_net.h>
 #include "llite_internal.h"
 
 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
@@ -1724,33 +1725,35 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
         RETURN(rc);
 }
 
-int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
+int ll_file_flock(struct file *file, int cmd, struct file_lock *fl)
 {
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_inode_info *li = ll_i2info(inode);
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct obd_device *obddev;
-        struct ldlm_res_id res_id =
-                { .name = {id_fid(&li->lli_id), id_group(&li->lli_id), LDLM_FLOCK} };
+        struct obd_device *obd = md_get_real_obd(sbi->ll_md_exp, &li->lli_id);
+        struct ldlm_res_id res_id = { .name = {id_fid(&li->lli_id),
+                                      id_group(&li->lli_id), LDLM_FLOCK} };
         struct lustre_handle lockh = {0};
+        struct ptlrpc_connection *conn;
+        ptl_process_id_t ptlpid;
         ldlm_policy_data_t flock;
         ldlm_mode_t mode = 0;
         int flags = 0;
         int rc;
         ENTRY;
 
-        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
-               inode->i_ino, file_lock);
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu cmd=%d file_lock=%p\n",
+               inode->i_ino, cmd, fl);
 
-        flock.l_flock.pid = file_lock->fl_pid;
-        flock.l_flock.start = file_lock->fl_start;
-        flock.l_flock.end = file_lock->fl_end;
+        if (!(fl->fl_flags & FL_POSIX))
+                RETURN(-ENOSYS);
 
-        switch (file_lock->fl_type) {
+        switch (fl->fl_type) {
         case F_RDLCK:
                 mode = LCK_PR;
                 break;
         case F_UNLCK:
+
                 /* An unlock request may or may not have any relation to
                  * existing locks so we may not be able to pass a lock handle
                  * via a normal ldlm_lock_cancel() request. The request may even
@@ -1765,22 +1768,21 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                 mode = LCK_PW;
                 break;
         default:
-                CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
-                LBUG();
+                CERROR("unknown fcntl lock type: %d\n", fl->fl_type);
+                RETURN(-EINVAL);
         }
 
         switch (cmd) {
-        case F_SETLKW:
-#ifdef F_SETLKW64
-        case F_SETLKW64:
-#endif
-                flags = 0;
-                break;
         case F_SETLK:
 #ifdef F_SETLK64
         case F_SETLK64:
 #endif
-                flags = LDLM_FL_BLOCK_NOWAIT;
+                flags |= LDLM_FL_BLOCK_NOWAIT;
+                break;
+        case F_SETLKW:
+#ifdef F_SETLKW64
+        case F_SETLKW64:
+#endif
                 break;
         case F_GETLK:
 #ifdef F_GETLK64
@@ -1789,23 +1791,88 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                 flags = LDLM_FL_TEST_LOCK;
                 /* Save the old mode so that if the mode in the lock changes we
                  * can decrement the appropriate reader or writer refcount. */
-                file_lock->fl_type = mode;
+                fl->fl_type = mode;
                 break;
         default:
                 CERROR("unknown fcntl lock command: %d\n", cmd);
-                LBUG();
+                RETURN(-EINVAL);
         }
 
+        /* Since we're called on every close to remove any oustanding Posix
+         * flocks owned by the process it's worth a little effort to avoid
+         * the RPCs if there are no flocks on this file from this node. */
+        if (mode == LCK_NL && fl->fl_start == 0 && fl->fl_end >= OFFSET_MAX) {
+                struct ldlm_resource *res;
+
+                res = ldlm_resource_get(obd->obd_namespace, NULL,
+                                        res_id, LDLM_FLOCK, 0);
+                if (res == NULL)
+                        RETURN(0);
+
+                ldlm_resource_putref(res);
+        }
+
+        conn = class_exp2cliimp(obd->obd_self_export)->imp_connection;
+        if (!conn || !conn->c_peer.peer_ni) 
+                RETURN(-ENOTCONN);
+                
+        rc = PtlGetId(conn->c_peer.peer_ni->pni_ni_h, &ptlpid);
+        if (rc != PTL_OK)
+                RETURN(-ENOTCONN);
+
+        flock.l_flock.start = fl->fl_start;
+        flock.l_flock.end = fl->fl_end;
+        /* XXX - ptlpid.pid is currently coming back a constant; i.e. 12345. */
+        flock.l_flock.pid = fl->fl_pid;
+        flock.l_flock.nid = ptlpid.nid;
+        flock.l_flock.blocking_pid = 0;
+        flock.l_flock.blocking_nid = 0;
+
         CDEBUG(D_DLMTRACE, "inode=%lu, pid="LPU64", flags=%#x, mode=%u, "
                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
                flags, mode, flock.l_flock.start, flock.l_flock.end);
 
-        obddev = md_get_real_obd(sbi->ll_md_exp, &li->lli_id);
-        rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
-                              obddev->obd_namespace,
+        rc = ldlm_cli_enqueue(obd->obd_self_export, NULL, obd->obd_namespace,
                               res_id, LDLM_FLOCK, &flock, mode, &flags,
-                              NULL, ldlm_flock_completion_ast, NULL, file_lock,
+                              NULL, ldlm_flock_completion_ast, NULL,
+                              md_get_real_obd(sbi->ll_md_exp, &sbi->ll_rootid),
                               NULL, 0, NULL, &lockh);
+
+        if (flags & LDLM_FL_TEST_LOCK) {
+                struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
+
+                fl->fl_start = lock->l_policy_data.l_flock.start;
+                fl->fl_end = lock->l_policy_data.l_flock.end;
+                fl->fl_pid = lock->l_policy_data.l_flock.pid;
+
+                switch (lock->l_granted_mode) {
+                case LCK_PR:
+                        fl->fl_type = F_RDLCK;
+                        break;
+                case LCK_PW:
+                        fl->fl_type = F_WRLCK;
+                        break;
+                case LCK_NL:
+                        fl->fl_type = F_UNLCK;
+                        break;
+                default:
+                        CERROR("unexpected lock type: %d returned from server."
+                               "\n", lock->l_granted_mode);
+                        rc = -EINVAL;
+                        break;
+                }
+
+                /* offset the addref() done by ldlm_handle2lock() above. */
+                LDLM_LOCK_PUT(lock);
+                ldlm_lock_decref(&lockh, mode);
+                ldlm_cli_cancel(&lockh);
+
+                /* the LDLM_CBPENDING flag was set in the lock by the
+                 * completion AST so the ldlm_lock_decref() call above
+                 * scheduled a blocking AST which will do the final
+                 * lock put on the lock. */
+        }
+
         RETURN(rc);
 }
 
index 8cdddeb..3f1a247 100644 (file)
@@ -2927,6 +2927,7 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         case LDLM_CONVERT:
         case LDLM_BL_CALLBACK:
         case LDLM_CP_CALLBACK:
+        case LDLM_FLK_DEADLOCK_CHK:
                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
@@ -3237,6 +3238,10 @@ int mds_handle(struct ptlrpc_request *req)
                 LBUG();
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
                 break;
+        case LDLM_FLK_DEADLOCK_CHK:
+                DEBUG_REQ(D_INODE, req, "flock deadlock check");
+                rc = ldlm_handle_flock_deadlock_check(req);
+                break;
         case LLOG_ORIGIN_HANDLE_OPEN:
                 DEBUG_REQ(D_INODE, req, "llog_init");
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
index 995aec2..276a5e5 100644 (file)
@@ -33,56 +33,57 @@ struct ll_rpc_opcode {
      __u32       opcode;
      const char *opname;
 } ll_rpc_opcode_table[LUSTRE_MAX_OPCODES] = {
-        { OST_REPLY,        "ost_reply" },
-        { OST_GETATTR,      "ost_getattr" },
-        { OST_SETATTR,      "ost_setattr" },
-        { OST_READ,         "ost_read" },
-        { OST_WRITE,        "ost_write" },
-        { OST_CREATE ,      "ost_create" },
-        { OST_DESTROY,      "ost_destroy" },
-        { OST_GET_INFO,     "ost_get_info" },
-        { OST_CONNECT,      "ost_connect" },
-        { OST_DISCONNECT,   "ost_disconnect" },
-        { OST_PUNCH,        "ost_punch" },
-        { OST_OPEN,         "ost_open" },
-        { OST_CLOSE,        "ost_close" },
-        { OST_STATFS,       "ost_statfs" },
-        { OST_SAN_READ,     "ost_san_read" },
-        { OST_SAN_WRITE,    "ost_san_write" },
-        { OST_SYNC,         "ost_sync" },
-        { OST_SET_INFO,     "ost_set_info" },
-        { MDS_GETATTR,      "mds_getattr" },
-        { MDS_GETATTR_LOCK, "mds_getattr_lock" },
-        { MDS_CLOSE,        "mds_close" },
-        { MDS_REINT,        "mds_reint" },
-        { MDS_READPAGE,     "mds_readpage" },
-        { MDS_CONNECT,      "mds_connect" },
-        { MDS_DISCONNECT,   "mds_disconnect" },
-        { MDS_GETSTATUS,    "mds_getstatus" },
-        { MDS_STATFS,       "mds_statfs" },
-        { MDS_PIN,          "mds_pin" },
-        { MDS_UNPIN,        "mds_unpin" },
-        { MDS_SYNC,         "mds_sync" },
-        { MDS_DONE_WRITING, "mds_done_writing" },
-        { MDS_ACCESS_CHECK, "mds_access_check"},
-        { MDS_PARSE_ID,     "mds_parse_id" },
-        { LDLM_ENQUEUE,     "ldlm_enqueue" },
-        { LDLM_CONVERT,     "ldlm_convert" },
-        { LDLM_CANCEL,      "ldlm_cancel" },
-        { LDLM_BL_CALLBACK, "ldlm_bl_callback" },
-        { LDLM_CP_CALLBACK, "ldlm_cp_callback" },
-        { LDLM_GL_CALLBACK, "ldlm_gl_callback" },
-        { PTLBD_QUERY,      "ptlbd_query" },
-        { PTLBD_READ,       "ptlbd_read" },
-        { PTLBD_WRITE,      "ptlbd_write" },
-        { PTLBD_FLUSH,      "ptlbd_flush" },
-        { PTLBD_CONNECT,    "ptlbd_connect" },
-        { PTLBD_DISCONNECT, "ptlbd_disconnect" },
-        { OBD_PING,         "obd_ping" },
-        { OBD_LOG_CANCEL,   "llog_origin_handle_cancel"},
-        { SEC_INIT,         "sec_init"},
-        { SEC_INIT_CONTINUE,"sec_init_continue"},
-        { SEC_FINI,         "sec_fini"},
+        { OST_REPLY,             "ost_reply" },
+        { OST_GETATTR,           "ost_getattr" },
+        { OST_SETATTR,           "ost_setattr" },
+        { OST_READ,              "ost_read" },
+        { OST_WRITE,             "ost_write" },
+        { OST_CREATE ,           "ost_create" },
+        { OST_DESTROY,           "ost_destroy" },
+        { OST_GET_INFO,          "ost_get_info" },
+        { OST_CONNECT,           "ost_connect" },
+        { OST_DISCONNECT,        "ost_disconnect" },
+        { OST_PUNCH,             "ost_punch" },
+        { OST_OPEN,              "ost_open" },
+        { OST_CLOSE,             "ost_close" },
+        { OST_STATFS,            "ost_statfs" },
+        { OST_SAN_READ,          "ost_san_read" },
+        { OST_SAN_WRITE,         "ost_san_write" },
+        { OST_SYNC,              "ost_sync" },
+        { OST_SET_INFO,          "ost_set_info" },
+        { MDS_GETATTR,           "mds_getattr" },
+        { MDS_GETATTR_LOCK ,     "mds_getattr_lock" },
+        { MDS_CLOSE,             "mds_close" },
+        { MDS_REINT,             "mds_reint" },
+        { MDS_READPAGE,          "mds_readpage" },
+        { MDS_CONNECT,           "mds_connect" },
+        { MDS_DISCONNECT,        "mds_disconnect" },
+        { MDS_GETSTATUS,         "mds_getstatus" },
+        { MDS_STATFS,            "mds_statfs" },
+        { MDS_PIN,               "mds_pin" },
+        { MDS_UNPIN,             "mds_unpin" },
+        { MDS_SYNC,              "mds_sync" },
+        { MDS_DONE_WRITING,      "mds_done_writing" },
+        { MDS_ACCESS_CHECK,      "mds_access_check"},
+        { MDS_PARSE_ID,          "mds_parse_id" },
+        { LDLM_ENQUEUE,          "ldlm_enqueue" },
+        { LDLM_CONVERT,          "ldlm_convert" },
+        { LDLM_CANCEL,           "ldlm_cancel" },
+        { LDLM_BL_CALLBACK,      "ldlm_bl_callback" },
+        { LDLM_CP_CALLBACK,      "ldlm_cp_callback" },
+        { LDLM_GL_CALLBACK,      "ldlm_gl_callback" },
+        { LDLM_FLK_DEADLOCK_CHK, "ldlm_flock_deadlock_check" },
+        { PTLBD_QUERY,           "ptlbd_query" },
+        { PTLBD_READ,            "ptlbd_read" },
+        { PTLBD_WRITE,           "ptlbd_write" },
+        { PTLBD_FLUSH,           "ptlbd_flush" },
+        { PTLBD_CONNECT,         "ptlbd_connect" },
+        { PTLBD_DISCONNECT,      "ptlbd_disconnect" },
+        { OBD_PING,              "obd_ping" },
+        { OBD_LOG_CANCEL,        "llog_origin_handle_cancel"},
+        { SEC_INIT,              "sec_init"},
+        { SEC_INIT_CONTINUE,     "sec_init_continue"},
+        { SEC_FINI,              "sec_fini"},
 };
 
 const char* ll_opcode2str(__u32 opcode)
index f07d95b..8736f7a 100644 (file)
@@ -849,7 +849,9 @@ void lustre_swab_ldlm_policy_data(ldlm_policy_data_t *d)
         __swab64s(&d->l_flock.start);
         __swab64s(&d->l_flock.end);
         __swab64s(&d->l_flock.pid);
+        __swab64s(&d->l_flock.nid);
         __swab64s(&d->l_flock.blocking_pid);
+        __swab64s(&d->l_flock.blocking_nid);
 }
 
 void lustre_swab_ldlm_intent(struct ldlm_intent *i)
index 04fb65b..65b8874 100644 (file)
@@ -395,8 +395,9 @@ check_ldlm_flock(void)
         CHECK_MEMBER(ldlm_flock, start);
         CHECK_MEMBER(ldlm_flock, end);
         CHECK_MEMBER(ldlm_flock, pid);
+        CHECK_MEMBER(ldlm_flock, nid);
         CHECK_MEMBER(ldlm_flock, blocking_pid);
-        CHECK_MEMBER(ldlm_flock, blocking_export);
+        CHECK_MEMBER(ldlm_flock, blocking_nid);
 }
 
 void
@@ -848,6 +849,7 @@ main(int argc, char **argv)
         CHECK_VALUE(LDLM_CANCEL);
         CHECK_VALUE(LDLM_BL_CALLBACK);
         CHECK_VALUE(LDLM_CP_CALLBACK);
+        CHECK_VALUE(LDLM_FLK_DEADLOCK_CHK);
         CHECK_VALUE(LDLM_LAST_OPC);
 
         CHECK_VALUE(LCK_EX);
index dc03a7b..3819669 100644 (file)
@@ -166,7 +166,9 @@ void lustre_assert_wire_constants(void)
                  (unsigned long long)LDLM_BL_CALLBACK);
         LASSERTF(LDLM_CP_CALLBACK == 105, " found %llu\n",
                  (unsigned long long)LDLM_CP_CALLBACK);
-        LASSERTF(LDLM_LAST_OPC == 107, " found %llu\n",
+        LASSERTF(LDLM_FLK_DEADLOCK_CHK == 107, " found %llu\n",
+                 (unsigned long long)LDLM_FLK_DEADLOCK_CHK);
+        LASSERTF(LDLM_LAST_OPC == 108, " found %llu\n",
                  (unsigned long long)LDLM_LAST_OPC);
         LASSERTF(LCK_EX == 1, " found %llu\n",
                  (unsigned long long)LCK_EX);
@@ -922,14 +924,18 @@ void lustre_assert_wire_constants(void)
                  (unsigned long long)(int)offsetof(struct ldlm_flock, pid));
         LASSERTF((int)sizeof(((struct ldlm_flock *)0)->pid) == 8, " found %llu\n",
                  (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->pid));
-        LASSERTF((int)offsetof(struct ldlm_flock, blocking_pid) == 24, " found %llu\n",
+        LASSERTF((int)offsetof(struct ldlm_flock, nid) == 24, " found %llu\n",
+                 (unsigned long long)(int)offsetof(struct ldlm_flock, nid));
+        LASSERTF((int)sizeof(((struct ldlm_flock *)0)->nid) == 8, " found %llu\n",
+                 (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->nid));
+        LASSERTF((int)offsetof(struct ldlm_flock, blocking_pid) == 32, " found %llu\n",
                  (unsigned long long)(int)offsetof(struct ldlm_flock, blocking_pid));
         LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_pid) == 8, " found %llu\n",
                  (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_pid));
-        LASSERTF((int)offsetof(struct ldlm_flock, blocking_export) == 32, " found %llu\n",
-                 (unsigned long long)(int)offsetof(struct ldlm_flock, blocking_export));
-        LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_export) == 8, " found %llu\n",
-                 (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_export));
+        LASSERTF((int)offsetof(struct ldlm_flock, blocking_nid) == 40, " found %llu\n",
+                 (unsigned long long)(int)offsetof(struct ldlm_flock, blocking_nid));
+        LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_nid) == 8, " found %llu\n",
+                 (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_nid));
 
         /* Checks for struct ldlm_intent */
         LASSERTF((int)sizeof(struct ldlm_intent) == 8, " found %llu\n",