Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
index 6f3d57a..7d9e1bf 100644 (file)
@@ -5,20 +5,23 @@
  *   Author: Peter Braam <braam@clusterfs.com>
  *   Author: Phil Schwan <phil@clusterfs.com>
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
  */
 
 #define DEBUG_SUBSYSTEM S_LDLM
@@ -26,9 +29,9 @@
 # include <liblustre.h>
 #endif
 
-#include <linux/lustre_dlm.h>
-#include <linux/obd_support.h>
-#include <linux/lustre_lib.h>
+#include <lustre_dlm.h>
+#include <obd_support.h>
+#include <lustre_lib.h>
 
 #include "ldlm_internal.h"
 
@@ -45,6 +48,7 @@ ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
         ldlm_mode_t req_mode = req->l_req_mode;
         __u64 req_start = req->l_req_extent.start;
         __u64 req_end = req->l_req_extent.end;
+        __u64 req_align, mask;
         int conflicting = 0;
         ENTRY;
 
@@ -57,6 +61,7 @@ ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
                 l_extent = &lock->l_policy_data.l_extent;
 
+                /* We already hit the minimum requested size, search no more */
                 if (new_ex->start == req_start && new_ex->end == req_end) {
                         EXIT;
                         return;
@@ -124,6 +129,34 @@ ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
                                           new_ex->end);
         }
+
+        if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
+                EXIT;
+                return;
+        }
+
+        /* we need to ensure that the lock extent is properly aligned to what
+         * the client requested.  We align it to the lowest-common denominator
+         * of the clients requested lock start and end alignment. */
+        mask = 0x1000ULL;
+        req_align = (req_end + 1) | req_start;
+        if (req_align != 0) {
+                while ((req_align & mask) == 0)
+                        mask <<= 1;
+        }
+        mask -= 1;
+        /* We can only shrink the lock, not grow it.
+         * This should never cause lock to be smaller than requested,
+         * since requested lock was already aligned on these boundaries. */
+        new_ex->start = ((new_ex->start - 1) | mask) + 1;
+        new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
+        LASSERTF(new_ex->start <= req_start,
+                 "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
+                 mask, new_ex->start, req_start);
+        LASSERTF(new_ex->end >= req_end,
+                 "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
+                 mask, new_ex->end, req_end);
+
         EXIT;
 }
 
@@ -132,9 +165,21 @@ ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
 static void ldlm_extent_policy(struct ldlm_resource *res,
                                struct ldlm_lock *lock, int *flags)
 {
-        struct ldlm_extent new_ex = { .start = 0, .end = ~0};
+        struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
+
+        if (lock->l_export == NULL)
+                /*
+                 * this is local lock taken by server (e.g., as a part of
+                 * OST-side locking, or unlink handling). Expansion doesn't
+                 * make a lot of sense for local locks, because they are
+                 * dropped immediately on operation completion and would only
+                 * conflict with other threads.
+                 */
+                return;
 
-        if (lock->l_req_mode == LCK_GROUP)
+        if (lock->l_policy_data.l_extent.start == 0 &&
+            lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+                /* fast-path whole file locks */
                 return;
 
         ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
@@ -159,189 +204,202 @@ static void ldlm_extent_policy(struct ldlm_resource *res,
  */
 static int
 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                         int *flags, struct list_head *work_list,
-                         struct list_head **insertp)
+                         int *flags, ldlm_error_t *err,
+                         struct list_head *work_list)
 {
         struct list_head *tmp;
-        struct ldlm_lock *lock = req;
+        struct ldlm_lock *lock;
         ldlm_mode_t req_mode = req->l_req_mode;
+        __u64 req_start = req->l_req_extent.start;
+        __u64 req_end = req->l_req_extent.end;
         int compat = 1;
+        int scan = 0;
         ENTRY;
 
         lockmode_verify(req_mode);
 
-        if (req->l_req_mode == LCK_GROUP) {
-                int found = 0;
-
-                list_for_each(tmp, queue) {
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-                        if (req == lock)
-                                break;
+        list_for_each(tmp, queue) {
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
+                if (req == lock)
+                        RETURN(compat);
+
+                if (unlikely(scan)) {
+                        /* We only get here if we are queuing GROUP lock
+                           and met some incompatible one. The main idea of this
+                           code is to insert GROUP lock past compatible GROUP
+                           lock in the waiting queue or if there is not any,
+                           then in front of first non-GROUP lock */
                         if (lock->l_req_mode != LCK_GROUP) {
-                                /* group locks are not blocked by waiting PR|PW
-                                 * locks. also, any group locks that could have
-                                 * blocked us would have appeared before this
-                                 * PR|PW lock in the list.  */
-                                if (lock->l_req_mode != lock->l_granted_mode)
-                                        break;
-
-                                compat = 0;
-                                if (!work_list)
-                                        break;
-
-                                if (lock->l_blocking_ast)
-                                        ldlm_add_ast_work_item(lock, req,
-                                                               work_list);
-                                continue;
+                                /* Ok, we hit non-GROUP lock, there should
+                                 * be no more GROUP locks later on, queue in
+                                 * front of first non-GROUP lock */
+
+                                ldlm_resource_insert_lock_after(lock, req);
+                                list_del_init(&lock->l_res_link);
+                                ldlm_resource_insert_lock_after(req, lock);
+                                RETURN(0);
                         }
+                        if (req->l_policy_data.l_extent.gid ==
+                             lock->l_policy_data.l_extent.gid) {
+                                /* found it */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                RETURN(0);
+                        }
+                        continue;
+                }
 
-                        /* no blocking ASTs are sent for group locks. */
-
-                        if (lock->l_policy_data.l_extent.gid !=
-                            req->l_policy_data.l_extent.gid) {
-                                if (*flags & LDLM_FL_BLOCK_NOWAIT)
-                                        RETURN(-EWOULDBLOCK);
-
-                                compat = 0;
-
-                                /* there's a blocking group lock in front
-                                 * of us on the queue.  It can be held
-                                 * indefinitely, so don't timeout. */
-                                *flags |= LDLM_FL_NO_TIMEOUT;
-
-                                /* the only reason to continue traversing the
-                                 * list at this point is to find the proper
-                                 * place to insert the lock in the waitq. */
-                                if (insertp && !found)
-                                        continue;
+                /* locks are compatible, overlap doesn't matter */
+                if (lockmode_compat(lock->l_req_mode, req_mode)) {
+                        /* non-group locks are compatible, overlap doesn't
+                           matter */
+                        if (likely(req_mode != LCK_GROUP))
+                                continue;
 
-                                break;
+                        /* If we are trying to get a GROUP lock and there is
+                           another one of this kind, we need to compare gid */
+                        if (req->l_policy_data.l_extent.gid ==
+                            lock->l_policy_data.l_extent.gid) {
+                                /* If existing lock with matched gid is granted,
+                                   we grant new one too. */
+                                if (lock->l_req_mode == lock->l_granted_mode)
+                                        RETURN(2);
+
+                                /* Otherwise we are scanning queue of waiting
+                                 * locks and it means current request would
+                                 * block along with existing lock (that is
+                                 * already blocked.
+                                 * If we are in nonblocking mode - return
+                                 * immediately */
+                                if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                        compat = -EWOULDBLOCK;
+                                        goto destroylock;
+                                }
+                                /* If this group lock is compatible with another
+                                 * group lock on the waiting list, they must be
+                                 * together in the list, so they can be granted
+                                 * at the same time.  Otherwise the later lock
+                                 * can get stuck behind another, incompatible,
+                                 * lock. */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                /* Because 'lock' is not granted, we can stop
+                                 * processing this queue and return immediately.
+                                 * There is no need to check the rest of the
+                                 * list. */
+                                RETURN(0);
                         }
+                }
 
-                        /* if a group lock with this gid has already been
-                         * granted then grant this one. */
-                        if (lock->l_req_mode == lock->l_granted_mode) {
-                                compat = 2;
-                                break;
+                if (unlikely(req_mode == LCK_GROUP &&
+                    (lock->l_req_mode != lock->l_granted_mode))) {
+                        scan = 1;
+                        compat = 0;
+                        if (lock->l_req_mode != LCK_GROUP) {
+                        /* Ok, we hit non-GROUP lock, there should be no
+                           more GROUP locks later on, queue in front of
+                           first non-GROUP lock */
+
+                                ldlm_resource_insert_lock_after(lock, req);
+                                list_del_init(&lock->l_res_link);
+                                ldlm_resource_insert_lock_after(req, lock);
+                                RETURN(0);
                         }
-                        found = 1;
-                }
-        } else {
-                __u64 req_start = req->l_req_extent.start;
-                __u64 req_end = req->l_req_extent.end;
-
-                list_for_each(tmp, queue) {
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-                        if (req == lock) {
-                                tmp = tmp->next;
-                                break;
+                        if (req->l_policy_data.l_extent.gid ==
+                             lock->l_policy_data.l_extent.gid) {
+                                /* found it */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                RETURN(0);
                         }
+                        continue;
+                }
 
-                        if (lock->l_req_mode == LCK_GROUP) {
-                                if (*flags & LDLM_FL_BLOCK_NOWAIT)
-                                        RETURN(-EWOULDBLOCK);
-
-                                /* No blocking ASTs are sent for group locks. */
-                                compat = 0;
-
-                                /* there's a blocking group lock in front
-                                 * of us on the queue.  It can be held
-                                 * indefinitely, so don't timeout. */
+                if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+                        /* If compared lock is GROUP, then requested is PR/PW/
+                         * so this is not compatible; extent range does not
+                         * matter */
+                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                compat = -EWOULDBLOCK;
+                                goto destroylock;
+                        } else {
                                 *flags |= LDLM_FL_NO_TIMEOUT;
-
-                                if (work_list)
-                                        continue;
-                                else
-                                        break;
                         }
-
-                        /* locks are compatible, overlap doesn't matter */
-                        if (lockmode_compat(lock->l_req_mode, req_mode))
-                                continue;
-
-                        if (lock->l_policy_data.l_extent.end < req_start ||
-                            lock->l_policy_data.l_extent.start > req_end)
-                                continue;
-
-                        compat = 0;
-
-                        if (!work_list)
-                                break;
-
-                        if (lock->l_blocking_ast)
-                                ldlm_add_ast_work_item(lock, req, work_list);
+                } else if (lock->l_policy_data.l_extent.end < req_start ||
+                           lock->l_policy_data.l_extent.start > req_end) {
+                        /* if a non group lock doesn't overlap skip it */
+                        continue;
                 }
 
-                /* Ensure we scan entire list if insertp is requested so that
-                 * the new request will be appended to the end of the list. */
-                LASSERTF((insertp == NULL) || (tmp == queue) || (req == lock));
-        }
+                if (!work_list)
+                        RETURN(0);
 
-        if (insertp)
-                *insertp = tmp;
+                compat = 0;
+                if (lock->l_blocking_ast)
+                        ldlm_add_ast_work_item(lock, req, work_list);
+        }
 
         RETURN(compat);
+destroylock:
+        list_del_init(&req->l_res_link);
+        ldlm_lock_destroy_nolock(req);
+        *err = compat;
+        RETURN(compat);
 }
 
 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
   *   - blocking ASTs have already been sent
-  *   - the caller has already initialized req->lr_tmp
   *   - must call this function with the ns lock held
   *
   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
   *   - blocking ASTs have not been sent
-  *   - the caller has NOT initialized req->lr_tmp, so we must
   *   - must call this function with the ns lock held once */
 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                              ldlm_error_t *err, struct list_head *work_list)
 {
         struct ldlm_resource *res = lock->l_resource;
-        struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
-        struct list_head *insertp = NULL;
+        struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
         int rc, rc2;
         ENTRY;
 
         LASSERT(list_empty(&res->lr_converting));
+        check_res_locked(res);
         *err = ELDLM_OK;
 
         if (!first_enq) {
-                /* -EWOULDBLOCK can't occur here since (flags & BLOCK_NOWAIT)
-                 * lock requests would either be granted or fail on their
-                 * first_enq. flags should always be zero here, and if that
-                 * ever changes we want to find out. */
+                /* Careful observers will note that we don't handle -EWOULDBLOCK
+                 * here, but it's ok for a non-obvious reason -- compat_queue
+                 * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
+                 * flags should always be zero here, and if that ever stops
+                 * being true, we want to find out. */
                 LASSERT(*flags == 0);
-                rc = ldlm_extent_compat_queue(&res->lr_granted, lock,
-                                              flags, NULL, NULL);
+                rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
+                                              err, NULL);
                 if (rc == 1) {
                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
-                                                      flags, NULL, NULL);
+                                                      flags, err, NULL);
                 }
                 if (rc == 0)
                         RETURN(LDLM_ITER_STOP);
 
                 ldlm_resource_unlink_lock(lock);
+
                 ldlm_extent_policy(res, lock, flags);
                 ldlm_grant_lock(lock, work_list);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
  restart:
-        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, &rpc_list,
-                                      NULL);
+        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
         if (rc < 0)
-                GOTO(destroylock, rc);
+                GOTO(out, rc); /* lock was destroyed */
         if (rc == 2)
                 goto grant;
 
-        /* Traverse the waiting list in case there are other conflicting
-         * lock requests ahead of us in the queue and send blocking ASTs */
-        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, &rpc_list,
-                                       &insertp);
+        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
         if (rc2 < 0)
-                GOTO(destroylock, rc);
+                GOTO(out, rc = rc2); /* lock was destroyed */
+
         if (rc + rc2 == 2) {
- grant:
       grant:
                 ldlm_extent_policy(res, lock, flags);
                 ldlm_resource_unlink_lock(lock);
                 ldlm_grant_lock(lock, NULL);
@@ -353,29 +411,26 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                  * terrible folly -- if we goto restart, we could get
                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
                 if (list_empty(&lock->l_res_link))
-                        ldlm_resource_add_lock(res, insertp, lock);
+                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                 unlock_res(res);
                 rc = ldlm_run_bl_ast_work(&rpc_list);
                 lock_res(res);
                 if (rc == -ERESTART)
                         GOTO(restart, -ERESTART);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
-        }
+                /* this way we force client to wait for the lock
+                 * endlessly once the lock is enqueued -bzzz */
+                *flags |= LDLM_FL_NO_TIMEOUT;
 
-        RETURN(0);
-
- destroylock:
-        list_del_init(&lock->l_res_link);
-        unlock_res(res);
-        ldlm_lock_destroy(lock);
-        lock_res(res);
-        *err = rc;
+        }
+        rc = 0;
+out:
         RETURN(rc);
 }
 
 /* When a lock is cancelled by a client, the KMS may undergo change if this
  * is the "highest lock".  This function returns the new KMS value.
- * Caller must hold ns_lock already. 
+ * Caller must hold ns_lock already.
  *
  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
@@ -389,7 +444,6 @@ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
         /* don't let another thread in ldlm_extent_shift_kms race in
          * just after we finish and take our lock into account in its
          * calculation of the kms */
-
         lock->l_flags |= LDLM_FL_KMS_IGNORE;
 
         list_for_each(tmp, &res->lr_granted) {