Whamcloud - gitweb
Revert my use of the configure_flags macro in the lustre.spec file and just
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
index 800880c..cbeacfa 100644 (file)
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
+/*
+ * 2003 - 2005 Copyright, Hewlett-Packard Development Compnay, LP.
+ *
+ * Developed under the sponsorship of the U.S. Government
+ *     under Subcontract No. B514193
+ */
+
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
 #include <linux/obd_support.h>
 #include <linux/obd_class.h>
 #include <linux/lustre_lib.h>
-#include <portals/list.h>
+#include <libcfs/list.h>
 #else
 #include <liblustre.h>
 #endif
 
 #include "ldlm_internal.h"
 
-#define l_flock_waitq   l_lru
-
 static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq);
-
-int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                            void *data, int flag);
+static int ldlm_deadlock_timeout = 30 * HZ;
 
 /**
  * list_for_remaining_safe - iterate over the remaining entries in a list
@@ -58,7 +61,8 @@ ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
 {
         return((new->l_policy_data.l_flock.pid ==
                 lock->l_policy_data.l_flock.pid) &&
-               (new->l_export == lock->l_export));
+               (new->l_policy_data.l_flock.nid ==
+                lock->l_policy_data.l_flock.nid));
 }
 
 static inline int
@@ -78,11 +82,12 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
                    mode, flags);
 
-        LASSERT(list_empty(&lock->l_flock_waitq));
-
+        /* don't need to take the locks here because the lock
+         * is on a local destroy list, not the resource list. */
         list_del_init(&lock->l_res_link);
+
         if (flags == LDLM_FL_WAIT_NOREPROC) {
-                /* client side - set a flag to prevent sending a CANCEL */
+                /* client side - set flags to prevent sending a CANCEL */
                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
                 ldlm_lock_decref_internal(lock, mode);
         }
@@ -91,73 +96,44 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
         EXIT;
 }
 
-static int
-ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
-{
-        struct obd_export *req_export = req->l_export;
-        struct obd_export *blocking_export = blocking_lock->l_export;
-        pid_t req_pid = req->l_policy_data.l_flock.pid;
-        pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
-        struct ldlm_lock *lock;
-
-restart:
-        list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
-                if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
-                    (lock->l_export != blocking_export))
-                        continue;
-
-                blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
-                blocking_export = (struct obd_export *)(long)
-                        lock->l_policy_data.l_flock.blocking_export;
-                if (blocking_pid == req_pid && blocking_export == req_export)
-                        return 1;
-
-                goto restart;
-        }
-
-        return 0;
-}
-
 int
 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
-                        ldlm_error_t *err)
+                        ldlm_error_t *err, struct list_head *work_list)
 {
+        struct list_head destroy_list = LIST_HEAD_INIT(destroy_list);
         struct ldlm_resource *res = req->l_resource;
         struct ldlm_namespace *ns = res->lr_namespace;
-        struct list_head *tmp;
-        struct list_head *ownlocks = NULL;
-        struct ldlm_lock *lock = NULL;
+        struct list_head *pos;
+        struct list_head *tmp = NULL;
+        struct ldlm_lock *lock;
         struct ldlm_lock *new = req;
-        struct ldlm_lock *new2 = NULL;
+        struct ldlm_lock *new2;
         ldlm_mode_t mode = req->l_req_mode;
-        int local = ns->ns_client;
         int added = (mode == LCK_NL);
         int overlaps = 0;
+        int rc = LDLM_ITER_CONTINUE;
+        int i = 0;
         ENTRY;
 
-        CDEBUG(D_DLMTRACE, "flags %#x pid "LPU64" mode %u start "LPU64" end "
-               LPU64"\n", *flags, new->l_policy_data.l_flock.pid, mode,
-               req->l_policy_data.l_flock.start,
+        CDEBUG(D_DLMTRACE, "flags %#x mode %u pid "LPU64" nid "LPU64" "
+               "start "LPU64" end "LPU64"\n", *flags, mode,
+               req->l_policy_data.l_flock.pid, 
+               req->l_policy_data.l_flock.nid, 
+              req->l_policy_data.l_flock.start,
                req->l_policy_data.l_flock.end);
 
         *err = ELDLM_OK;
 
-        if (local) {
-                /* No blocking ASTs are sent to the clients for
-                 * Posix file & record locks */
-                req->l_blocking_ast = NULL;
-        } else {
-                /* Called on the server for lock cancels. */
-                req->l_blocking_ast = ldlm_flock_blocking_ast;
-        }
+        /* No blocking ASTs are sent for Posix file & record locks */
+        req->l_blocking_ast = NULL;
 
         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
                 /* This loop determines where this processes locks start
                  * in the resource lr_granted list. */
-                list_for_each(tmp, &res->lr_granted) {
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                list_for_each(pos, &res->lr_granted) {
+                        lock = list_entry(pos, struct ldlm_lock, l_res_link);
                         if (ldlm_same_flock_owner(lock, req)) {
-                                ownlocks = tmp;
+                                tmp = pos;
                                 break;
                         }
                 }
@@ -166,12 +142,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
 
                 /* This loop determines if there are existing locks
                  * that conflict with the new lock request. */
-                list_for_each(tmp, &res->lr_granted) {
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                list_for_each(pos, &res->lr_granted) {
+                        lock = list_entry(pos, struct ldlm_lock, l_res_link);
 
                         if (ldlm_same_flock_owner(lock, req)) {
-                                if (!ownlocks)
-                                        ownlocks = tmp;
+                                if (!tmp)
+                                        tmp = pos;
                                 continue;
                         }
 
@@ -182,42 +158,39 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         if (!ldlm_flocks_overlap(lock, req))
                                 continue;
 
+                        /* deadlock detection will be done will be postponed
+                         * until ldlm_flock_completion_ast(). */
+
+                        *flags |= LDLM_FL_LOCK_CHANGED;
+
+                        req->l_policy_data.l_flock.blocking_pid =
+                                lock->l_policy_data.l_flock.pid;
+                        req->l_policy_data.l_flock.blocking_nid =
+                                lock->l_policy_data.l_flock.nid;
+
                         if (!first_enq)
                                 RETURN(LDLM_ITER_CONTINUE);
 
                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                ldlm_flock_destroy(req, mode, *flags);
+                                list_move(&req->l_res_link, &destroy_list);
                                 *err = -EAGAIN;
-                                RETURN(LDLM_ITER_STOP);
+                                GOTO(out, rc = LDLM_ITER_STOP);
                         }
 
                         if (*flags & LDLM_FL_TEST_LOCK) {
-                                ldlm_flock_destroy(req, mode, *flags);
                                 req->l_req_mode = lock->l_granted_mode;
                                 req->l_policy_data.l_flock.pid =
                                         lock->l_policy_data.l_flock.pid;
+                                req->l_policy_data.l_flock.nid =
+                                        lock->l_policy_data.l_flock.nid;
                                 req->l_policy_data.l_flock.start =
                                         lock->l_policy_data.l_flock.start;
                                 req->l_policy_data.l_flock.end =
                                         lock->l_policy_data.l_flock.end;
-                                *flags |= LDLM_FL_LOCK_CHANGED;
-                                RETURN(LDLM_ITER_STOP);
-                        }
-
-                        if (ldlm_flock_deadlock(req, lock)) {
-                                ldlm_flock_destroy(req, mode, *flags);
-                                *err = -EDEADLK;
-                                RETURN(LDLM_ITER_STOP);
+                                list_move(&req->l_res_link, &destroy_list);
+                                GOTO(out, rc = LDLM_ITER_STOP);
                         }
 
-                        req->l_policy_data.l_flock.blocking_pid =
-                                lock->l_policy_data.l_flock.pid;
-                        req->l_policy_data.l_flock.blocking_export =
-                                (long)(void *)lock->l_export;
-
-                        LASSERT(list_empty(&req->l_flock_waitq));
-                        list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
-
                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
                         *flags |= LDLM_FL_BLOCK_GRANTED;
                         RETURN(LDLM_ITER_STOP);
@@ -225,24 +198,18 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
         }
 
         if (*flags & LDLM_FL_TEST_LOCK) {
-                ldlm_flock_destroy(req, mode, *flags);
                 req->l_req_mode = LCK_NL;
                 *flags |= LDLM_FL_LOCK_CHANGED;
-                RETURN(LDLM_ITER_STOP);
+                list_move(&req->l_res_link, &destroy_list);
+                GOTO(out, rc = LDLM_ITER_STOP);
         }
 
-        /* In case we had slept on this lock request take it off of the
-         * deadlock detection waitq. */
-        list_del_init(&req->l_flock_waitq);
-
         /* Scan the locks owned by this process that overlap this request.
          * We may have to merge or split existing locks. */
+        pos = (tmp != NULL) ? tmp : &res->lr_granted;
 
-        if (!ownlocks)
-                ownlocks = &res->lr_granted;
-
-        list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
-                lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
+        list_for_remaining_safe(pos, tmp, &res->lr_granted) {
+                lock = list_entry(pos, struct ldlm_lock, l_res_link);
 
                 if (!ldlm_same_flock_owner(lock, new))
                         break;
@@ -281,7 +248,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         }
 
                         if (added) {
-                                ldlm_flock_destroy(lock, mode, *flags);
+                                list_move(&lock->l_res_link, &destroy_list);
                         } else {
                                 new = lock;
                                 added = 1;
@@ -307,7 +274,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                         new->l_policy_data.l_flock.end + 1;
                                 break;
                         }
-                        ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
+                        list_move(&lock->l_res_link, &destroy_list);
                         continue;
                 }
                 if (new->l_policy_data.l_flock.end >=
@@ -333,14 +300,16 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                         lock->l_granted_mode, NULL, NULL, NULL,
                                         NULL, 0);
                 if (!new2) {
-                        ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
+                        list_move(&req->l_res_link, &destroy_list);
                         *err = -ENOLCK;
-                        RETURN(LDLM_ITER_STOP);
+                        GOTO(out, rc = LDLM_ITER_STOP);
                 }
 
                 new2->l_granted_mode = lock->l_granted_mode;
                 new2->l_policy_data.l_flock.pid =
                         new->l_policy_data.l_flock.pid;
+                new2->l_policy_data.l_flock.nid =
+                        new->l_policy_data.l_flock.nid;
                 new2->l_policy_data.l_flock.start =
                         lock->l_policy_data.l_flock.start;
                 new2->l_policy_data.l_flock.end =
@@ -354,20 +323,26 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                  &new2->l_export->exp_ldlm_data.led_held_locks);
                 }
                 if (*flags == LDLM_FL_WAIT_NOREPROC)
-                        ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+                        ldlm_lock_addref_internal_nolock(new2,
+                                                         lock->l_granted_mode);
 
                 /* insert new2 at lock */
-                ldlm_resource_add_lock(res, ownlocks, new2);
+                ldlm_resource_add_lock(res, pos, new2);
                 LDLM_LOCK_PUT(new2);
                 break;
         }
 
-        /* Add req to the granted queue before calling ldlm_reprocess_all(). */
-        if (!added) {
-                req->l_granted_mode = req->l_req_mode;
+        /* At this point we're granting the lock request. */
+        req->l_granted_mode = req->l_req_mode;
+
+        if (added) {
+                list_move(&req->l_res_link, &destroy_list);
+        } else {
+                /* Add req to the granted queue before calling
+                 * ldlm_reprocess_all() below. */
                 list_del_init(&req->l_res_link);
-                /* insert new lock before ownlocks in list. */
-                ldlm_resource_add_lock(res, ownlocks, req);
+                /* insert new lock before pos in the list. */
+                ldlm_resource_add_lock(res, pos, req);
         }
 
         if (*flags != LDLM_FL_WAIT_NOREPROC) {
@@ -382,36 +357,288 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                          * but only once because first_enq will be false from
                          * ldlm_reprocess_queue. */
                         if ((mode == LCK_NL) && overlaps) {
-                                struct list_head rpc_list
-                                                    = LIST_HEAD_INIT(rpc_list);
+                                struct list_head rpc_list =
+                                                     LIST_HEAD_INIT(rpc_list);
                                 int rc;
-restart:
-                                res->lr_tmp = &rpc_list;
-                                ldlm_reprocess_queue(res, &res->lr_waiting);
-                                res->lr_tmp = NULL;
-
-                                l_unlock(&ns->ns_lock);
-                                rc = ldlm_run_ast_work(res->lr_namespace,
-                                                       &rpc_list);
-                                l_lock(&ns->ns_lock);
+ restart:
+                                ldlm_reprocess_queue(res, &res->lr_waiting,
+                                                     &rpc_list);
+                                unlock_res(res);
+                                rc = ldlm_run_cp_ast_work(&rpc_list);
+                                lock_res(res);
                                 if (rc == -ERESTART)
                                         GOTO(restart, -ERESTART);
                        }
                 } else {
                         LASSERT(req->l_completion_ast);
-                        ldlm_add_ast_work_item(req, NULL, NULL, 0);
+                        ldlm_add_ast_work_item(req, NULL, work_list);
                 }
         }
 
-        /* In case we're reprocessing the requested lock we can't destroy
-         * it until after calling ldlm_ast_work_item() above so that lawi()
-         * can bump the reference count on req. Otherwise req could be freed
-         * before the completion AST can be sent.  */
-        if (added)
-                ldlm_flock_destroy(req, mode, *flags);
+ out:
+        if (!list_empty(&destroy_list)) {
+                /* FIXME: major hack. when called from ldlm_lock_enqueue()
+                 * the res and the lock are locked. When called from
+                 * ldlm_reprocess_queue() the res is locked but the lock
+                 * is not. */
+                if (added && first_enq && res->lr_namespace->ns_client)
+                        unlock_bitlock(req);
+
+                unlock_res(res);
+
+                CDEBUG(D_DLMTRACE, "Destroy locks:\n");
+
+                list_for_each_safe(pos, tmp, &destroy_list) {
+                        lock = list_entry(pos, struct ldlm_lock, l_res_link);
+                        ldlm_lock_dump(D_DLMTRACE, lock, ++i);
+                        ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
+                }
+
+                if (added && first_enq && res->lr_namespace->ns_client)
+                        lock_bitlock(req);
+
+                lock_res(res);
+        }
 
-        ldlm_resource_dump(res);
-        RETURN(LDLM_ITER_CONTINUE);
+        RETURN(rc);
+}
+
+struct ldlm_sleep_flock {
+        __u64 lsf_pid;
+        __u64 lsf_nid;
+        __u64 lsf_blocking_pid;
+        __u64 lsf_blocking_nid;
+        struct list_head lsf_list;
+};
+
+int
+ldlm_handle_flock_deadlock_check(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        struct ldlm_sleep_flock *lsf;
+        struct list_head *pos;
+        __u64 pid, nid, blocking_pid, blocking_nid;
+        unsigned int flags;
+        int rc = 0;
+        ENTRY;
+
+        req->rq_status = 0;
+
+        dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
+                                      lustre_swab_ldlm_request);
+        if (dlm_req == NULL) {
+                CERROR("bad request buffer for flock deadlock check\n");
+                RETURN(-EFAULT);
+        }
+
+        flags = dlm_req->lock_flags;
+        pid = dlm_req->lock_desc.l_policy_data.l_flock.pid;
+        nid = dlm_req->lock_desc.l_policy_data.l_flock.nid;
+        blocking_pid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_pid;
+        blocking_nid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_nid;
+
+        CDEBUG(D_DLMTRACE, "flags: 0x%x req: pid: "LPU64" nid "LPU64" "
+               "blk: pid: "LPU64" nid: "LPU64"\n",
+               dlm_req->lock_flags, pid, nid, blocking_pid, blocking_nid);
+
+        if (flags & LDLM_FL_GET_BLOCKING) {
+                struct ldlm_lock *lock;
+                struct ldlm_reply *dlm_rep;
+                int size = sizeof(*dlm_rep);
+                
+                lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+                if (!lock) {
+                        CERROR("received deadlock check for unknown lock "
+                               "cookie "LPX64" from client %s id %s\n",
+                               dlm_req->lock_handle1.cookie,
+                               req->rq_export->exp_client_uuid.uuid,
+                               req->rq_peerstr);
+                        req->rq_status = -ESTALE;
+                        RETURN(0);
+                }
+
+                lock_res_and_lock(lock);
+                blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
+                blocking_nid = lock->l_policy_data.l_flock.blocking_nid;
+                unlock_res_and_lock(lock);
+
+                rc = lustre_pack_reply(req, 1, &size, NULL);
+                if (rc) {
+                        CERROR("lustre_pack_reply failed: rc = %d\n", rc);
+                        req->rq_status = rc;
+                        RETURN(0);
+                }
+
+                dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*dlm_rep));
+                dlm_rep->lock_desc.l_policy_data.l_flock.blocking_pid =
+                        blocking_pid;
+                dlm_rep->lock_desc.l_policy_data.l_flock.blocking_nid =
+                        blocking_nid;
+        } else {
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+        }
+
+        if (flags & LDLM_FL_DEADLOCK_CHK) {
+                __u64 orig_blocking_pid = blocking_pid;
+                __u64 orig_blocking_nid = blocking_nid;
+ restart:
+                list_for_each(pos, &ldlm_flock_waitq) {
+                        lsf = list_entry(pos,struct ldlm_sleep_flock,lsf_list);
+
+                        /* We want to return a deadlock condition for the
+                         * last lock on the waitq that created the deadlock
+                         * situation. Posix verification suites expect this
+                         * behavior. We'll stop if we haven't found a deadlock
+                         * up to the point where the current process is queued
+                         * to let the last lock on the queue that's in the
+                         * deadlock loop detect the deadlock. In this case
+                         * just update the blocking info.*/
+                        if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) {
+                                lsf->lsf_blocking_pid = blocking_pid;
+                                lsf->lsf_blocking_nid = blocking_nid;
+                                break;
+                        }
+
+                        if ((lsf->lsf_pid != blocking_pid) ||
+                            (lsf->lsf_nid != blocking_nid))
+                                continue;
+
+                        blocking_pid = lsf->lsf_blocking_pid;
+                        blocking_nid = lsf->lsf_blocking_nid;
+
+                        if (blocking_pid == pid && blocking_nid == nid){
+                                req->rq_status = -EDEADLOCK;
+                                flags |= LDLM_FL_DEADLOCK_DEL;
+                                break;
+                        }
+
+                        goto restart;
+                }
+
+                /* If we got all the way thru the list then we're not on it. */
+                if (pos == &ldlm_flock_waitq) {
+                        OBD_ALLOC(lsf, sizeof(*lsf));
+                        if (!lsf)
+                                RETURN(-ENOSPC);
+
+                        lsf->lsf_pid = pid;
+                        lsf->lsf_nid = nid;
+                        lsf->lsf_blocking_pid = orig_blocking_pid;
+                        lsf->lsf_blocking_nid = orig_blocking_nid;
+                        list_add_tail(&lsf->lsf_list, &ldlm_flock_waitq);
+                }
+        }
+        
+        if (flags & LDLM_FL_DEADLOCK_DEL) {
+                list_for_each_entry(lsf, &ldlm_flock_waitq, lsf_list) {
+                        if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) {
+                                list_del_init(&lsf->lsf_list);
+                                OBD_FREE(lsf, sizeof(*lsf));
+                                break;
+                        }
+                }
+        }
+
+        RETURN(rc);
+}
+
+int
+ldlm_send_flock_deadlock_check(struct obd_device *obd, struct ldlm_lock *lock,
+                               unsigned int flags)
+{
+        struct obd_import *imp;
+        struct ldlm_request *body;
+        struct ldlm_reply *reply;
+        struct ptlrpc_request *req;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        CDEBUG(D_DLMTRACE, "obd: %p flags: 0x%x\n", obd, flags);
+
+        imp = obd->u.cli.cl_import;
+        req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_FLK_DEADLOCK_CHK, 1,
+                              &size, NULL);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        body->lock_flags = flags;
+        ldlm_lock2desc(lock, &body->lock_desc);
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+
+        if (flags & LDLM_FL_GET_BLOCKING) {
+                size = sizeof(*reply);
+                req->rq_replen = lustre_msg_size(1, &size);
+        } else {
+                req->rq_replen = lustre_msg_size(0, NULL);
+        }
+
+        rc = ptlrpc_queue_wait(req);
+        if (rc != ELDLM_OK)
+                GOTO(out, rc);
+
+        if (flags & LDLM_FL_GET_BLOCKING) {
+                reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                                           lustre_swab_ldlm_reply);
+                if (reply == NULL) {
+                        CERROR ("Can't unpack ldlm_reply\n");
+                        GOTO (out, rc = -EPROTO);
+                }
+
+                lock->l_policy_data.l_flock.blocking_pid =
+                        reply->lock_desc.l_policy_data.l_flock.blocking_pid;
+                lock->l_policy_data.l_flock.blocking_nid =
+                        reply->lock_desc.l_policy_data.l_flock.blocking_nid;
+
+                CDEBUG(D_DLMTRACE, "LDLM_FL_GET_BLOCKING: pid: "LPU64" "
+                       "nid: "LPU64" blk: pid: "LPU64" nid: "LPU64"\n",
+                       lock->l_policy_data.l_flock.pid,
+                       lock->l_policy_data.l_flock.nid,
+                       lock->l_policy_data.l_flock.blocking_pid,
+                       lock->l_policy_data.l_flock.blocking_nid);
+        }
+
+        rc = req->rq_status;
+ out:
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
+int
+ldlm_flock_deadlock_check(struct obd_device *master_obd, struct obd_device *obd,
+                          struct ldlm_lock *lock)
+{
+        unsigned int flags = 0;
+        int rc;
+        ENTRY;
+
+        if (obd == NULL) {
+                /* Delete this process from the sleeplock list. */
+                flags = LDLM_FL_DEADLOCK_DEL;
+                rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags);
+                RETURN(rc);
+        }
+
+        flags = LDLM_FL_GET_BLOCKING;
+        if (obd == master_obd)
+                flags |= LDLM_FL_DEADLOCK_CHK;
+
+        rc = ldlm_send_flock_deadlock_check(obd, lock, flags);
+        CDEBUG(D_DLMTRACE, "1st check: rc: %d flags: 0x%x\n", rc, flags);
+        if (rc || (flags & LDLM_FL_DEADLOCK_CHK))
+                RETURN(rc);
+
+        CDEBUG(D_DLMTRACE, "about to send 2nd check: master: %p.\n",
+                master_obd);
+
+        flags = LDLM_FL_DEADLOCK_CHK;
+
+        rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags);
+
+        CDEBUG(D_DLMTRACE, "2nd check: rc: %d flags: 0x%x\n", rc, flags);
+
+        RETURN(rc);
 }
 
 struct ldlm_flock_wait_data {
@@ -424,44 +651,37 @@ ldlm_flock_interrupted_wait(void *data)
 {
         struct ldlm_lock *lock;
         struct lustre_handle lockh;
-        int rc;
         ENTRY;
 
         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
 
-        /* take lock off the deadlock detection waitq. */
-        list_del_init(&lock->l_flock_waitq);
+        /* client side - set flag to prevent lock from being put on lru list */
+        lock_res_and_lock(lock);
+        lock->l_flags |= LDLM_FL_CBPENDING;
+        unlock_res_and_lock(lock);
 
         ldlm_lock_decref_internal(lock, lock->l_req_mode);
         ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
+        ldlm_cli_cancel(&lockh);
         EXIT;
 }
 
 int
 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
-        struct ldlm_namespace *ns;
-        struct file_lock *getlk = lock->l_ast_data;
         struct ldlm_flock_wait_data fwd;
         unsigned long irqflags;
         struct obd_device *obd;
+        struct obd_device *master_obd = (struct obd_device *)lock->l_ast_data;
         struct obd_import *imp = NULL;
         ldlm_error_t err;
+        int deadlock_checked = 0;
         int rc = 0;
         struct l_wait_info lwi;
         ENTRY;
 
-        CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
-               flags, data, getlk);
-
         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
 
-        if (flags == 0) {
-                wake_up(&lock->l_waitq);
-                RETURN(0);
-        }
-
         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
                        LDLM_FL_BLOCK_CONV)))
                 goto  granted;
@@ -470,10 +690,12 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                    "sleeping");
 
         ldlm_lock_dump(D_DLMTRACE, lock, 0);
-
         fwd.fwd_lock = lock;
         obd = class_exp2obd(lock->l_conn_export);
 
+        CDEBUG(D_DLMTRACE, "flags: 0x%x master: %p obd: %p\n",
+               flags, master_obd, obd);
+
         /* if this is a local lock, then there is no import */
         if (obd != NULL)
                 imp = obd->u.cli.cl_import;
@@ -484,75 +706,52 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 spin_unlock_irqrestore(&imp->imp_lock, irqflags);
         }
 
-        lwi = LWI_TIMEOUT_INTR(0,NULL,ldlm_flock_interrupted_wait,&fwd);
+        lwi = LWI_TIMEOUT_INTR(ldlm_deadlock_timeout, NULL,
+                               ldlm_flock_interrupted_wait, &fwd);
 
-        /* Go to sleep until the lock is granted. */
+ restart:
         rc = l_wait_event(lock->l_waitq,
                           ((lock->l_req_mode == lock->l_granted_mode) ||
                            lock->l_destroyed), &lwi);
 
-        if (rc) {
-                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
-                           rc);
-                RETURN(rc);
+        if (rc == -ETIMEDOUT) {
+                deadlock_checked = 1;
+                rc = ldlm_flock_deadlock_check(master_obd, obd, lock);
+                if (rc == -EDEADLK)
+                        ldlm_flock_interrupted_wait(&fwd);
+                else {
+                        CDEBUG(D_DLMTRACE, "lock: %p going back to sleep,\n",
+                               lock);
+                        goto restart;
+                }
+        } else {
+                if (deadlock_checked)
+                        ldlm_flock_deadlock_check(master_obd, NULL, lock);
         }
 
-        LASSERT(!(lock->l_destroyed));
-
-granted:
-
-        LDLM_DEBUG(lock, "client-side enqueue waking up");
-        ns = lock->l_resource->lr_namespace;
-        l_lock(&ns->ns_lock);
-
-        /* take lock off the deadlock detection waitq. */
-        list_del_init(&lock->l_flock_waitq);
+        LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);
+        RETURN(rc);
+ granted:
+        LDLM_DEBUG(lock, "client-side enqueue granted");
+        lock_res_and_lock(lock);
 
         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
         list_del_init(&lock->l_res_link);
 
         if (flags & LDLM_FL_TEST_LOCK) {
-                /* fcntl(F_GETLK) request */
-                /* The old mode was saved in getlk->fl_type so that if the mode
-                 * in the lock changes we can decref the approprate refcount. */
-                ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
-                switch (lock->l_granted_mode) {
-                case LCK_PR:
-                        getlk->fl_type = F_RDLCK;
-                        break;
-                case LCK_PW:
-                        getlk->fl_type = F_WRLCK;
-                        break;
-                default:
-                        getlk->fl_type = F_UNLCK;
-                }
-                getlk->fl_pid = lock->l_policy_data.l_flock.pid;
-                getlk->fl_start = lock->l_policy_data.l_flock.start;
-                getlk->fl_end = lock->l_policy_data.l_flock.end;
+                /* client side - set flag to prevent sending a CANCEL */
+                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
         } else {
+                int noreproc = LDLM_FL_WAIT_NOREPROC;
+
                 /* We need to reprocess the lock to do merges or splits
                  * with existing locks owned by this process. */
-                flags = LDLM_FL_WAIT_NOREPROC;
-                ldlm_process_flock_lock(lock, &flags, 1, &err);
+                ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
+                if (flags == 0)
+                        wake_up(&lock->l_waitq);
         }
-        l_unlock(&ns->ns_lock);
-        RETURN(0);
-}
-
-int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                            void *data, int flag)
-{
-        struct ldlm_namespace *ns;
-        ENTRY;
 
-        LASSERT(lock);
-        LASSERT(flag == LDLM_CB_CANCELING);
-
-        ns = lock->l_resource->lr_namespace;
-        
-        /* take lock off the deadlock detection waitq. */
-        l_lock(&ns->ns_lock);
-        list_del_init(&lock->l_flock_waitq);
-        l_unlock(&ns->ns_lock);
+        unlock_res_and_lock(lock);
         RETURN(0);
 }