* Author: Peter Braam <braam@clusterfs.com>
* Author: Phil Schwan <phil@clusterfs.com>
*
- * This file is part of Lustre, http://www.lustre.org.
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
*
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
*
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
*
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
*/
#define DEBUG_SUBSYSTEM S_LDLM
# include <liblustre.h>
#endif
-#include <linux/lustre_dlm.h>
-#include <linux/obd_support.h>
-#include <linux/lustre_lib.h>
+#include <lustre_dlm.h>
+#include <obd_support.h>
+#include <lustre_lib.h>
#include "ldlm_internal.h"
ldlm_mode_t req_mode = req->l_req_mode;
__u64 req_start = req->l_req_extent.start;
__u64 req_end = req->l_req_extent.end;
+ __u64 req_align, mask;
int conflicting = 0;
ENTRY;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
l_extent = &lock->l_policy_data.l_extent;
+ /* We already hit the minimum requested size, search no more */
if (new_ex->start == req_start && new_ex->end == req_end) {
EXIT;
return;
new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
new_ex->end);
}
+
+ if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
+ EXIT;
+ return;
+ }
+
+ /* we need to ensure that the lock extent is properly aligned to what
+ * the client requested. We align it to the lowest-common denominator
+ * of the clients requested lock start and end alignment. */
+ mask = 0x1000ULL;
+ req_align = (req_end + 1) | req_start;
+ if (req_align != 0) {
+ while ((req_align & mask) == 0)
+ mask <<= 1;
+ }
+ mask -= 1;
+ /* We can only shrink the lock, not grow it.
+ * This should never cause lock to be smaller than requested,
+ * since requested lock was already aligned on these boundaries. */
+ new_ex->start = ((new_ex->start - 1) | mask) + 1;
+ new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
+ LASSERTF(new_ex->start <= req_start,
+ "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
+ mask, new_ex->start, req_start);
+ LASSERTF(new_ex->end >= req_end,
+ "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
+ mask, new_ex->end, req_end);
+
EXIT;
}
static void ldlm_extent_policy(struct ldlm_resource *res,
struct ldlm_lock *lock, int *flags)
{
- struct ldlm_extent new_ex = { .start = 0, .end = ~0};
+ struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
+
+ if (lock->l_export == NULL)
+ /*
+ * this is local lock taken by server (e.g., as a part of
+ * OST-side locking, or unlink handling). Expansion doesn't
+ * make a lot of sense for local locks, because they are
+ * dropped immediately on operation completion and would only
+ * conflict with other threads.
+ */
+ return;
- if (lock->l_req_mode == LCK_GROUP)
+ if (lock->l_policy_data.l_extent.start == 0 &&
+ lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+ /* fast-path whole file locks */
return;
ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
*/
static int
ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
- int *flags, struct list_head *work_list,
- struct list_head **insertp)
+ int *flags, ldlm_error_t *err,
+ struct list_head *work_list)
{
struct list_head *tmp;
- struct ldlm_lock *lock = req;
+ struct ldlm_lock *lock;
ldlm_mode_t req_mode = req->l_req_mode;
+ __u64 req_start = req->l_req_extent.start;
+ __u64 req_end = req->l_req_extent.end;
int compat = 1;
+ int scan = 0;
ENTRY;
lockmode_verify(req_mode);
- if (req->l_req_mode == LCK_GROUP) {
- int found = 0;
-
- list_for_each(tmp, queue) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- if (req == lock)
- break;
+ list_for_each(tmp, queue) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ if (req == lock)
+ RETURN(compat);
+
+ if (unlikely(scan)) {
+ /* We only get here if we are queuing GROUP lock
+ and met some incompatible one. The main idea of this
+ code is to insert GROUP lock past compatible GROUP
+ lock in the waiting queue or if there is not any,
+ then in front of first non-GROUP lock */
if (lock->l_req_mode != LCK_GROUP) {
- /* group locks are not blocked by waiting PR|PW
- * locks. also, any group locks that could have
- * blocked us would have appeared before this
- * PR|PW lock in the list. */
- if (lock->l_req_mode != lock->l_granted_mode)
- break;
-
- compat = 0;
- if (!work_list)
- break;
-
- if (lock->l_blocking_ast)
- ldlm_add_ast_work_item(lock, req,
- work_list);
- continue;
+ /* Ok, we hit non-GROUP lock, there should
+ * be no more GROUP locks later on, queue in
+ * front of first non-GROUP lock */
+
+ ldlm_resource_insert_lock_after(lock, req);
+ list_del_init(&lock->l_res_link);
+ ldlm_resource_insert_lock_after(req, lock);
+ RETURN(0);
}
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* found it */
+ ldlm_resource_insert_lock_after(lock, req);
+ RETURN(0);
+ }
+ continue;
+ }
- /* no blocking ASTs are sent for group locks. */
-
- if (lock->l_policy_data.l_extent.gid !=
- req->l_policy_data.l_extent.gid) {
- if (*flags & LDLM_FL_BLOCK_NOWAIT)
- RETURN(-EWOULDBLOCK);
-
- compat = 0;
-
- /* there's a blocking group lock in front
- * of us on the queue. It can be held
- * indefinitely, so don't timeout. */
- *flags |= LDLM_FL_NO_TIMEOUT;
-
- /* the only reason to continue traversing the
- * list at this point is to find the proper
- * place to insert the lock in the waitq. */
- if (insertp && !found)
- continue;
+ /* locks are compatible, overlap doesn't matter */
+ if (lockmode_compat(lock->l_req_mode, req_mode)) {
+ /* non-group locks are compatible, overlap doesn't
+ matter */
+ if (likely(req_mode != LCK_GROUP))
+ continue;
- break;
+ /* If we are trying to get a GROUP lock and there is
+ another one of this kind, we need to compare gid */
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* If existing lock with matched gid is granted,
+ we grant new one too. */
+ if (lock->l_req_mode == lock->l_granted_mode)
+ RETURN(2);
+
+ /* Otherwise we are scanning queue of waiting
+ * locks and it means current request would
+ * block along with existing lock (that is
+ * already blocked.
+ * If we are in nonblocking mode - return
+ * immediately */
+ if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ compat = -EWOULDBLOCK;
+ goto destroylock;
+ }
+ /* If this group lock is compatible with another
+ * group lock on the waiting list, they must be
+ * together in the list, so they can be granted
+ * at the same time. Otherwise the later lock
+ * can get stuck behind another, incompatible,
+ * lock. */
+ ldlm_resource_insert_lock_after(lock, req);
+ /* Because 'lock' is not granted, we can stop
+ * processing this queue and return immediately.
+ * There is no need to check the rest of the
+ * list. */
+ RETURN(0);
}
+ }
- /* if a group lock with this gid has already been
- * granted then grant this one. */
- if (lock->l_req_mode == lock->l_granted_mode) {
- compat = 2;
- break;
+ if (unlikely(req_mode == LCK_GROUP &&
+ (lock->l_req_mode != lock->l_granted_mode))) {
+ scan = 1;
+ compat = 0;
+ if (lock->l_req_mode != LCK_GROUP) {
+ /* Ok, we hit non-GROUP lock, there should be no
+ more GROUP locks later on, queue in front of
+ first non-GROUP lock */
+
+ ldlm_resource_insert_lock_after(lock, req);
+ list_del_init(&lock->l_res_link);
+ ldlm_resource_insert_lock_after(req, lock);
+ RETURN(0);
}
- found = 1;
- }
- } else {
- __u64 req_start = req->l_req_extent.start;
- __u64 req_end = req->l_req_extent.end;
-
- list_for_each(tmp, queue) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- if (req == lock) {
- tmp = tmp->next;
- break;
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* found it */
+ ldlm_resource_insert_lock_after(lock, req);
+ RETURN(0);
}
+ continue;
+ }
- if (lock->l_req_mode == LCK_GROUP) {
- if (*flags & LDLM_FL_BLOCK_NOWAIT)
- RETURN(-EWOULDBLOCK);
-
- /* No blocking ASTs are sent for group locks. */
- compat = 0;
-
- /* there's a blocking group lock in front
- * of us on the queue. It can be held
- * indefinitely, so don't timeout. */
+ if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+ /* If compared lock is GROUP, then requested is PR/PW/
+ * so this is not compatible; extent range does not
+ * matter */
+ if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ compat = -EWOULDBLOCK;
+ goto destroylock;
+ } else {
*flags |= LDLM_FL_NO_TIMEOUT;
-
- if (work_list)
- continue;
- else
- break;
}
-
- /* locks are compatible, overlap doesn't matter */
- if (lockmode_compat(lock->l_req_mode, req_mode))
- continue;
-
- if (lock->l_policy_data.l_extent.end < req_start ||
- lock->l_policy_data.l_extent.start > req_end)
- continue;
-
- compat = 0;
-
- if (!work_list)
- break;
-
- if (lock->l_blocking_ast)
- ldlm_add_ast_work_item(lock, req, work_list);
+ } else if (lock->l_policy_data.l_extent.end < req_start ||
+ lock->l_policy_data.l_extent.start > req_end) {
+ /* if a non group lock doesn't overlap skip it */
+ continue;
}
- /* Ensure we scan entire list if insertp is requested so that
- * the new request will be appended to the end of the list. */
- LASSERTF((insertp == NULL) || (tmp == queue) || (req == lock));
- }
+ if (!work_list)
+ RETURN(0);
- if (insertp)
- *insertp = tmp;
+ compat = 0;
+ if (lock->l_blocking_ast)
+ ldlm_add_ast_work_item(lock, req, work_list);
+ }
RETURN(compat);
+destroylock:
+ list_del_init(&req->l_res_link);
+ ldlm_lock_destroy_nolock(req);
+ *err = compat;
+ RETURN(compat);
}
/* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
* - blocking ASTs have already been sent
- * - the caller has already initialized req->lr_tmp
* - must call this function with the ns lock held
*
* If first_enq is 1 (ie, called from ldlm_lock_enqueue):
* - blocking ASTs have not been sent
- * - the caller has NOT initialized req->lr_tmp, so we must
* - must call this function with the ns lock held once */
int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
ldlm_error_t *err, struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
- struct list_head *insertp = NULL;
+ struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
int rc, rc2;
ENTRY;
LASSERT(list_empty(&res->lr_converting));
+ check_res_locked(res);
*err = ELDLM_OK;
if (!first_enq) {
- /* -EWOULDBLOCK can't occur here since (flags & BLOCK_NOWAIT)
- * lock requests would either be granted or fail on their
- * first_enq. flags should always be zero here, and if that
- * ever changes we want to find out. */
+ /* Careful observers will note that we don't handle -EWOULDBLOCK
+ * here, but it's ok for a non-obvious reason -- compat_queue
+ * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
+ * flags should always be zero here, and if that ever stops
+ * being true, we want to find out. */
LASSERT(*flags == 0);
- rc = ldlm_extent_compat_queue(&res->lr_granted, lock,
- flags, NULL, NULL);
+ rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
+ err, NULL);
if (rc == 1) {
rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
- flags, NULL, NULL);
+ flags, err, NULL);
}
if (rc == 0)
RETURN(LDLM_ITER_STOP);
ldlm_resource_unlink_lock(lock);
+
ldlm_extent_policy(res, lock, flags);
ldlm_grant_lock(lock, work_list);
RETURN(LDLM_ITER_CONTINUE);
}
restart:
- rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, &rpc_list,
- NULL);
+ rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
if (rc < 0)
- GOTO(destroylock, rc);
+ GOTO(out, rc); /* lock was destroyed */
if (rc == 2)
goto grant;
- /* Traverse the waiting list in case there are other conflicting
- * lock requests ahead of us in the queue and send blocking ASTs */
- rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, &rpc_list,
- &insertp);
+ rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
if (rc2 < 0)
- GOTO(destroylock, rc);
+ GOTO(out, rc = rc2); /* lock was destroyed */
+
if (rc + rc2 == 2) {
- grant:
+ grant:
ldlm_extent_policy(res, lock, flags);
ldlm_resource_unlink_lock(lock);
ldlm_grant_lock(lock, NULL);
* terrible folly -- if we goto restart, we could get
* re-ordered! Causes deadlock, because ASTs aren't sent! */
if (list_empty(&lock->l_res_link))
- ldlm_resource_add_lock(res, insertp, lock);
+ ldlm_resource_add_lock(res, &res->lr_waiting, lock);
unlock_res(res);
rc = ldlm_run_bl_ast_work(&rpc_list);
lock_res(res);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
*flags |= LDLM_FL_BLOCK_GRANTED;
- }
+ /* this way we force client to wait for the lock
+ * endlessly once the lock is enqueued -bzzz */
+ *flags |= LDLM_FL_NO_TIMEOUT;
- RETURN(0);
-
- destroylock:
- list_del_init(&lock->l_res_link);
- unlock_res(res);
- ldlm_lock_destroy(lock);
- lock_res(res);
- *err = rc;
+ }
+ rc = 0;
+out:
RETURN(rc);
}
/* When a lock is cancelled by a client, the KMS may undergo change if this
* is the "highest lock". This function returns the new KMS value.
- * Caller must hold ns_lock already.
+ * Caller must hold ns_lock already.
*
* NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
__u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
/* don't let another thread in ldlm_extent_shift_kms race in
* just after we finish and take our lock into account in its
* calculation of the kms */
-
lock->l_flags |= LDLM_FL_KMS_IGNORE;
list_for_each(tmp, &res->lr_granted) {