Whamcloud - gitweb
LU-571 ldlm: Remove parallel AST limitation
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
index 3a8bd24..7e8a754 100644 (file)
@@ -1,43 +1,72 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
- *   Author: Peter Braam <braam@clusterfs.com>
- *   Author: Phil Schwan <phil@clusterfs.com>
+ * GPL HEADER START
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2003 Hewlett-Packard Development Company LP.
+ * Developed under the sponsorship of the US Government under
+ * Subcontract No. B514193
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
-#include <linux/lustre_dlm.h>
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <portals/list.h>
+#include <lustre_dlm.h>
+#include <obd_support.h>
+#include <obd_class.h>
+#include <lustre_lib.h>
+#include <libcfs/list.h>
 #else
 #include <liblustre.h>
+#include <obd_class.h>
 #endif
 
 #include "ldlm_internal.h"
 
 #define l_flock_waitq   l_lru
 
-static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq);
+/**
+ * Wait queue for Posix lock deadlock detection, added with
+ * ldlm_lock::l_flock_waitq.
+ */
+static CFS_LIST_HEAD(ldlm_flock_waitq);
+/**
+ * Lock protecting access to ldlm_flock_waitq.
+ */
+cfs_spinlock_t ldlm_flock_waitq_lock = CFS_SPIN_LOCK_UNLOCKED;
 
 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                             void *data, int flag);
@@ -45,10 +74,10 @@ int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 /**
  * list_for_remaining_safe - iterate over the remaining entries in a list
  *              and safeguard against removal of a list entry.
- * @pos:        the &struct list_head to use as a loop counter. pos MUST
+ * \param pos   the &struct list_head to use as a loop counter. pos MUST
  *              have been initialized prior to using it in this macro.
- * @n:          another &struct list_head to use as temporary storage
- * @head:       the head for your list.
+ * \param n     another &struct list_head to use as temporary storage
+ * \param head  the head for your list.
  */
 #define list_for_remaining_safe(pos, n, head) \
         for (n = pos->next; pos != (head); pos = n, n = pos->next)
@@ -56,8 +85,8 @@ int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 static inline int
 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
 {
-        return((new->l_policy_data.l_flock.pid ==
-                lock->l_policy_data.l_flock.pid) &&
+        return((new->l_policy_data.l_flock.owner ==
+                lock->l_policy_data.l_flock.owner) &&
                (new->l_export == lock->l_export));
 }
 
@@ -78,16 +107,21 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
                    mode, flags);
 
-        LASSERT(list_empty(&lock->l_flock_waitq));
+        /* Safe to not lock here, since it should be empty anyway */
+        LASSERT(cfs_list_empty(&lock->l_flock_waitq));
 
-        list_del_init(&lock->l_res_link);
-        if (flags == LDLM_FL_WAIT_NOREPROC) {
+        cfs_list_del_init(&lock->l_res_link);
+        if (flags == LDLM_FL_WAIT_NOREPROC &&
+            !(lock->l_flags & LDLM_FL_FAILED)) {
                 /* client side - set a flag to prevent sending a CANCEL */
                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
-                ldlm_lock_decref_internal(lock, mode);
+
+                /* when reaching here, it is under lock_res_and_lock(). Thus,
+                   need call the nolock version of ldlm_lock_decref_internal*/
+                ldlm_lock_decref_internal_nolock(lock, mode);
         }
 
-        ldlm_lock_destroy(lock);
+        ldlm_lock_destroy_nolock(lock);
         EXIT;
 }
 
@@ -96,47 +130,55 @@ ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
 {
         struct obd_export *req_export = req->l_export;
         struct obd_export *blocking_export = blocking_lock->l_export;
-        pid_t req_pid = req->l_policy_data.l_flock.pid;
-        pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
+        __u64 req_owner = req->l_policy_data.l_flock.owner;
+        __u64 blocking_owner = blocking_lock->l_policy_data.l_flock.owner;
         struct ldlm_lock *lock;
 
+        cfs_spin_lock(&ldlm_flock_waitq_lock);
 restart:
-        list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
-                if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
+        cfs_list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
+                if ((lock->l_policy_data.l_flock.owner != blocking_owner) ||
                     (lock->l_export != blocking_export))
                         continue;
 
-                blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
-                blocking_export = (struct obd_export *)(long)
+                blocking_owner = lock->l_policy_data.l_flock.blocking_owner;
+                blocking_export = (struct obd_export *)
                         lock->l_policy_data.l_flock.blocking_export;
-                if (blocking_pid == req_pid && blocking_export == req_export)
+                if (blocking_owner == req_owner &&
+                    blocking_export == req_export) {
+                        cfs_spin_unlock(&ldlm_flock_waitq_lock);
                         return 1;
+                }
 
                 goto restart;
         }
+        cfs_spin_unlock(&ldlm_flock_waitq_lock);
 
         return 0;
 }
 
 int
 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
-                        ldlm_error_t *err)
+                        ldlm_error_t *err, cfs_list_t *work_list)
 {
         struct ldlm_resource *res = req->l_resource;
-        struct ldlm_namespace *ns = res->lr_namespace;
-        struct list_head *tmp;
-        struct list_head *ownlocks = NULL;
+        struct ldlm_namespace *ns = ldlm_res_to_ns(res);
+        cfs_list_t *tmp;
+        cfs_list_t *ownlocks = NULL;
         struct ldlm_lock *lock = NULL;
         struct ldlm_lock *new = req;
         struct ldlm_lock *new2 = NULL;
         ldlm_mode_t mode = req->l_req_mode;
-        int local = ns->ns_client;
+        int local = ns_is_client(ns);
         int added = (mode == LCK_NL);
         int overlaps = 0;
+        int splitted = 0;
+        const struct ldlm_callback_suite null_cbs = { NULL };
         ENTRY;
 
-        CDEBUG(D_DLMTRACE, "flags %#x pid "LPU64" mode %u start "LPU64" end "
-               LPU64"\n", *flags, new->l_policy_data.l_flock.pid, mode,
+        CDEBUG(D_DLMTRACE, "flags %#x owner "LPU64" pid %u mode %u start "LPU64
+               " end "LPU64"\n", *flags, new->l_policy_data.l_flock.owner,
+               new->l_policy_data.l_flock.pid, mode,
                req->l_policy_data.l_flock.start,
                req->l_policy_data.l_flock.end);
 
@@ -151,11 +193,13 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 req->l_blocking_ast = ldlm_flock_blocking_ast;
         }
 
+reprocess:
         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
                 /* This loop determines where this processes locks start
                  * in the resource lr_granted list. */
-                list_for_each(tmp, &res->lr_granted) {
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                cfs_list_for_each(tmp, &res->lr_granted) {
+                        lock = cfs_list_entry(tmp, struct ldlm_lock,
+                                              l_res_link);
                         if (ldlm_same_flock_owner(lock, req)) {
                                 ownlocks = tmp;
                                 break;
@@ -166,8 +210,9 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
 
                 /* This loop determines if there are existing locks
                  * that conflict with the new lock request. */
-                list_for_each(tmp, &res->lr_granted) {
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                cfs_list_for_each(tmp, &res->lr_granted) {
+                        lock = cfs_list_entry(tmp, struct ldlm_lock,
+                                              l_res_link);
 
                         if (ldlm_same_flock_owner(lock, req)) {
                                 if (!ownlocks)
@@ -210,13 +255,16 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                 RETURN(LDLM_ITER_STOP);
                         }
 
-                        req->l_policy_data.l_flock.blocking_pid =
-                                lock->l_policy_data.l_flock.pid;
+                        req->l_policy_data.l_flock.blocking_owner =
+                                lock->l_policy_data.l_flock.owner;
                         req->l_policy_data.l_flock.blocking_export =
-                                (long)(void *)lock->l_export;
+                                lock->l_export;
 
-                        LASSERT(list_empty(&req->l_flock_waitq));
-                        list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
+                        LASSERT(cfs_list_empty(&req->l_flock_waitq));
+                        cfs_spin_lock(&ldlm_flock_waitq_lock);
+                        cfs_list_add_tail(&req->l_flock_waitq,
+                                          &ldlm_flock_waitq);
+                        cfs_spin_unlock(&ldlm_flock_waitq_lock);
 
                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
                         *flags |= LDLM_FL_BLOCK_GRANTED;
@@ -233,7 +281,9 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
 
         /* In case we had slept on this lock request take it off of the
          * deadlock detection waitq. */
-        list_del_init(&req->l_flock_waitq);
+        cfs_spin_lock(&ldlm_flock_waitq_lock);
+        cfs_list_del_init(&req->l_flock_waitq);
+        cfs_spin_unlock(&ldlm_flock_waitq_lock);
 
         /* Scan the locks owned by this process that overlap this request.
          * We may have to merge or split existing locks. */
@@ -242,7 +292,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 ownlocks = &res->lr_granted;
 
         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
-                lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
+                lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
 
                 if (!ldlm_same_flock_owner(lock, new))
                         break;
@@ -254,7 +304,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                          * overflow and underflow. */
                         if ((new->l_policy_data.l_flock.start >
                              (lock->l_policy_data.l_flock.end + 1))
-                            && (lock->l_policy_data.l_flock.end != ~0))
+                            && (lock->l_policy_data.l_flock.end !=
+                                OBD_OBJECT_EOF))
                                 continue;
 
                         if ((new->l_policy_data.l_flock.end <
@@ -327,20 +378,30 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                  * it must see the original lock data in the reply. */
 
                 /* XXX - if ldlm_lock_new() can sleep we should
-                 * release the ns_lock, allocate the new lock,
+                 * release the lr_lock, allocate the new lock,
                  * and restart processing this lock. */
-                new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
-                                        lock->l_granted_mode, NULL, NULL, NULL,
-                                        NULL, 0);
                 if (!new2) {
-                        ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
-                        *err = -ENOLCK;
-                        RETURN(LDLM_ITER_STOP);
+                        unlock_res_and_lock(req);
+                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
+                                        lock->l_granted_mode, &null_cbs,
+                                        NULL, 0);
+                        lock_res_and_lock(req);
+                        if (!new2) {
+                                ldlm_flock_destroy(req, lock->l_granted_mode,
+                                                   *flags);
+                                *err = -ENOLCK;
+                                RETURN(LDLM_ITER_STOP);
+                        }
+                        goto reprocess;
                 }
 
+                splitted = 1;
+
                 new2->l_granted_mode = lock->l_granted_mode;
                 new2->l_policy_data.l_flock.pid =
                         new->l_policy_data.l_flock.pid;
+                new2->l_policy_data.l_flock.owner =
+                        new->l_policy_data.l_flock.owner;
                 new2->l_policy_data.l_flock.start =
                         lock->l_policy_data.l_flock.start;
                 new2->l_policy_data.l_flock.end =
@@ -349,25 +410,33 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         new->l_policy_data.l_flock.end + 1;
                 new2->l_conn_export = lock->l_conn_export;
                 if (lock->l_export != NULL) {
-                        new2->l_export = class_export_get(lock->l_export);
-                        list_add(&new2->l_export_chain,
-                                 &new2->l_export->exp_ldlm_data.led_held_locks);
+                        new2->l_export = class_export_lock_get(lock->l_export, new2);
+                        if (new2->l_export->exp_lock_hash &&
+                            cfs_hlist_unhashed(&new2->l_exp_hash))
+                                cfs_hash_add(new2->l_export->exp_lock_hash,
+                                             &new2->l_remote_handle,
+                                             &new2->l_exp_hash);
                 }
                 if (*flags == LDLM_FL_WAIT_NOREPROC)
-                        ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+                        ldlm_lock_addref_internal_nolock(new2,
+                                                         lock->l_granted_mode);
 
                 /* insert new2 at lock */
                 ldlm_resource_add_lock(res, ownlocks, new2);
-                LDLM_LOCK_PUT(new2);
+                LDLM_LOCK_RELEASE(new2);
                 break;
         }
 
+        /* if new2 is created but never used, destroy it*/
+        if (splitted == 0 && new2 != NULL)
+                ldlm_lock_destroy_nolock(new2);
+
         /* At this point we're granting the lock request. */
         req->l_granted_mode = req->l_req_mode;
 
         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
         if (!added) {
-                list_del_init(&req->l_res_link);
+                cfs_list_del_init(&req->l_res_link);
                 /* insert new lock before ownlocks in list. */
                 ldlm_resource_add_lock(res, ownlocks, req);
         }
@@ -375,7 +444,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
         if (*flags != LDLM_FL_WAIT_NOREPROC) {
                 if (first_enq) {
                         /* If this is an unlock, reprocess the waitq and
-                         * send completions ASTs for locks that can now be 
+                         * send completions ASTs for locks that can now be
                          * granted. The only problem with doing this
                          * reprocessing here is that the completion ASTs for
                          * newly granted locks will be sent before the unlock
@@ -384,24 +453,22 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                          * but only once because first_enq will be false from
                          * ldlm_reprocess_queue. */
                         if ((mode == LCK_NL) && overlaps) {
-                                struct list_head rpc_list
-                                                    = LIST_HEAD_INIT(rpc_list);
+                                CFS_LIST_HEAD(rpc_list);
                                 int rc;
 restart:
-                                res->lr_tmp = &rpc_list;
-                                ldlm_reprocess_queue(res, &res->lr_waiting);
-                                res->lr_tmp = NULL;
-
-                                l_unlock(&ns->ns_lock);
-                                rc = ldlm_run_ast_work(res->lr_namespace,
-                                                       &rpc_list);
-                                l_lock(&ns->ns_lock);
+                                ldlm_reprocess_queue(res, &res->lr_waiting,
+                                                     &rpc_list);
+
+                                unlock_res_and_lock(req);
+                                rc = ldlm_run_ast_work(ns, &rpc_list,
+                                                       LDLM_WORK_CP_AST);
+                                lock_res_and_lock(req);
                                 if (rc == -ERESTART)
                                         GOTO(restart, -ERESTART);
                        }
                 } else {
                         LASSERT(req->l_completion_ast);
-                        ldlm_add_ast_work_item(req, NULL, NULL, 0);
+                        ldlm_add_ast_work_item(req, NULL, work_list);
                 }
         }
 
@@ -412,7 +479,7 @@ restart:
         if (added)
                 ldlm_flock_destroy(req, mode, *flags);
 
-        ldlm_resource_dump(res);
+        ldlm_resource_dump(D_INFO, res);
         RETURN(LDLM_ITER_CONTINUE);
 }
 
@@ -425,73 +492,93 @@ static void
 ldlm_flock_interrupted_wait(void *data)
 {
         struct ldlm_lock *lock;
-        struct lustre_handle lockh;
-        int rc;
         ENTRY;
 
         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
 
         /* take lock off the deadlock detection waitq. */
-        list_del_init(&lock->l_flock_waitq);
+        cfs_spin_lock(&ldlm_flock_waitq_lock);
+        cfs_list_del_init(&lock->l_flock_waitq);
+        cfs_spin_unlock(&ldlm_flock_waitq_lock);
+
+        /* client side - set flag to prevent lock from being put on lru list */
+        lock->l_flags |= LDLM_FL_CBPENDING;
 
-        ldlm_lock_decref_internal(lock, lock->l_req_mode);
-        ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
         EXIT;
 }
 
+/**
+ * Flock completion calback function.
+ *
+ * \param lock [in,out]: A lock to be handled
+ * \param flags    [in]: flags
+ * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
+ *
+ * \retval 0    : success
+ * \retval <0   : failure
+ */
 int
 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
-        struct ldlm_namespace *ns;
-        struct file_lock *getlk = lock->l_ast_data;
-        struct ldlm_flock_wait_data fwd;
-        unsigned long irqflags;
-        struct obd_device *obd;
-        struct obd_import *imp = NULL;
-        ldlm_error_t err;
-        int rc = 0;
-        struct l_wait_info lwi;
+        cfs_flock_t                    *getlk = lock->l_ast_data;
+        struct obd_device              *obd;
+        struct obd_import              *imp = NULL;
+        struct ldlm_flock_wait_data     fwd;
+        struct l_wait_info              lwi;
+        ldlm_error_t                    err;
+        int                             rc = 0;
         ENTRY;
 
         CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
                flags, data, getlk);
 
-        LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
-
-        if (flags == 0) {
-                wake_up(&lock->l_waitq);
+        /* Import invalidation. We need to actually release the lock
+         * references being held, so that it can go away. No point in
+         * holding the lock even if app still believes it has it, since
+         * server already dropped it anyway. Only for granted locks too. */
+        if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
+            (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
+                if (lock->l_req_mode == lock->l_granted_mode &&
+                    lock->l_granted_mode != LCK_NL &&
+                    NULL == data)
+                        ldlm_lock_decref_internal(lock, lock->l_req_mode);
+
+                /* Need to wake up the waiter if we were evicted */
+                cfs_waitq_signal(&lock->l_waitq);
                 RETURN(0);
         }
 
+        LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
+
         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
-                       LDLM_FL_BLOCK_CONV)))
-                goto  granted;
+                       LDLM_FL_BLOCK_CONV))) {
+                if (NULL == data)
+                        /* mds granted the lock in the reply */
+                        goto granted;
+                /* CP AST RPC: lock get granted, wake it up */
+                cfs_waitq_signal(&lock->l_waitq);
+                RETURN(0);
+        }
 
         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
                    "sleeping");
-
-        ldlm_lock_dump(D_DLMTRACE, lock, 0);
-
         fwd.fwd_lock = lock;
         obd = class_exp2obd(lock->l_conn_export);
 
-        /* if this is a local lock, then there is no import */
-        if (obd != NULL)
+        /* if this is a local lock, there is no import */
+        if (NULL != obd)
                 imp = obd->u.cli.cl_import;
 
-        if (imp != NULL) {
-                spin_lock_irqsave(&imp->imp_lock, irqflags);
+        if (NULL != imp) {
+                cfs_spin_lock(&imp->imp_lock);
                 fwd.fwd_generation = imp->imp_generation;
-                spin_unlock_irqrestore(&imp->imp_lock, irqflags);
+                cfs_spin_unlock(&imp->imp_lock);
         }
 
         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
 
         /* Go to sleep until the lock is granted. */
-        rc = l_wait_event(lock->l_waitq,
-                          ((lock->l_req_mode == lock->l_granted_mode) ||
-                           lock->l_destroyed), &lwi);
+        rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
 
         if (rc) {
                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
@@ -499,47 +586,69 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 RETURN(rc);
         }
 
-        LASSERT(!(lock->l_destroyed));
-
 granted:
+        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
 
-        LDLM_DEBUG(lock, "client-side enqueue waking up");
-        ns = lock->l_resource->lr_namespace;
-        l_lock(&ns->ns_lock);
+        if (lock->l_destroyed) {
+                LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
+                RETURN(0);
+        }
+
+        if (lock->l_flags & LDLM_FL_FAILED) {
+                LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
+                RETURN(-EIO);
+        }
+
+        if (rc) {
+                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
+                           rc);
+                RETURN(rc);
+        }
+
+        LDLM_DEBUG(lock, "client-side enqueue granted");
 
         /* take lock off the deadlock detection waitq. */
-        list_del_init(&lock->l_flock_waitq);
+        cfs_spin_lock(&ldlm_flock_waitq_lock);
+        cfs_list_del_init(&lock->l_flock_waitq);
+        cfs_spin_unlock(&ldlm_flock_waitq_lock);
 
+        lock_res_and_lock(lock);
         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
-        list_del_init(&lock->l_res_link);
+        cfs_list_del_init(&lock->l_res_link);
 
         if (flags & LDLM_FL_TEST_LOCK) {
                 /* fcntl(F_GETLK) request */
                 /* The old mode was saved in getlk->fl_type so that if the mode
-                 * in the lock changes we can decref the approprate refcount. */
-                ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
+                 * in the lock changes we can decref the appropriate refcount.*/
+                ldlm_flock_destroy(lock, cfs_flock_type(getlk),
+                                   LDLM_FL_WAIT_NOREPROC);
                 switch (lock->l_granted_mode) {
                 case LCK_PR:
-                        getlk->fl_type = F_RDLCK;
+                        cfs_flock_set_type(getlk, F_RDLCK);
                         break;
                 case LCK_PW:
-                        getlk->fl_type = F_WRLCK;
+                        cfs_flock_set_type(getlk, F_WRLCK);
                         break;
                 default:
-                        getlk->fl_type = F_UNLCK;
+                        cfs_flock_set_type(getlk, F_UNLCK);
                 }
-                getlk->fl_pid = lock->l_policy_data.l_flock.pid;
-                getlk->fl_start = lock->l_policy_data.l_flock.start;
-                getlk->fl_end = lock->l_policy_data.l_flock.end;
+                cfs_flock_set_pid(getlk,
+                                  (pid_t)lock->l_policy_data.l_flock.pid);
+                cfs_flock_set_start(getlk,
+                                    (loff_t)lock->l_policy_data.l_flock.start);
+                cfs_flock_set_end(getlk,
+                                  (loff_t)lock->l_policy_data.l_flock.end);
         } else {
+                int noreproc = LDLM_FL_WAIT_NOREPROC;
+
                 /* We need to reprocess the lock to do merges or splits
                  * with existing locks owned by this process. */
-                flags = LDLM_FL_WAIT_NOREPROC;
-                ldlm_process_flock_lock(lock, &flags, 1, &err);
+                ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
         }
-        l_unlock(&ns->ns_lock);
+        unlock_res_and_lock(lock);
         RETURN(0);
 }
+EXPORT_SYMBOL(ldlm_flock_completion_ast);
 
 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                             void *data, int flag)
@@ -550,11 +659,35 @@ int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         LASSERT(lock);
         LASSERT(flag == LDLM_CB_CANCELING);
 
-        ns = lock->l_resource->lr_namespace;
-        
+        ns = ldlm_lock_to_ns(lock);
+
         /* take lock off the deadlock detection waitq. */
-        l_lock(&ns->ns_lock);
-        list_del_init(&lock->l_flock_waitq);
-        l_unlock(&ns->ns_lock);
+        cfs_spin_lock(&ldlm_flock_waitq_lock);
+        cfs_list_del_init(&lock->l_flock_waitq);
+        cfs_spin_unlock(&ldlm_flock_waitq_lock);
         RETURN(0);
 }
+
+void ldlm_flock_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
+                                     ldlm_policy_data_t *lpolicy)
+{
+        memset(lpolicy, 0, sizeof(*lpolicy));
+        lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
+        lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
+        lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
+        lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
+        /* Compat code, old clients had no idea about owner field and
+         * relied solely on pid for ownership. Introduced in 2.1, April 2011 */
+        if (!lpolicy->l_flock.owner)
+                lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
+}
+
+void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
+                                     ldlm_wire_policy_data_t *wpolicy)
+{
+        memset(wpolicy, 0, sizeof(*wpolicy));
+        wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
+        wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
+        wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
+        wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
+}