* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2011, Whamcloud, Inc.
+ * Copyright (c) 2010, 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
+/**
+ * This file implements POSIX lock type for Lustre.
+ * Its policy properties are start and end of extent and PID.
+ *
+ * These locks are only done through MDS due to POSIX semantics requiring
+ * e.g. that locks could be only partially released and as such split into
+ * two parts, and also that two adjacent locks from the same process may be
+ * merged into a single wider lock.
+ *
+ * Lock modes are mapped like this:
+ * PR and PW for READ and WRITE locks
+ * NL to request a releasing of a portion of the lock
+ *
+ * These flock locks never timeout.
+ */
+
#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
lock->l_policy_data.l_flock.start));
}
-static inline int ldlm_flock_blocking_link(struct ldlm_lock *req,
- struct ldlm_lock *lock)
+static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
+ struct ldlm_lock *lock)
{
- int rc = 0;
-
/* For server only */
if (req->l_export == NULL)
- return 0;
-
- if (unlikely(req->l_export->exp_flock_hash == NULL)) {
- rc = ldlm_init_flock_export(req->l_export);
- if (rc)
- goto error;
- }
+ return;
LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
cfs_hash_add(req->l_export->exp_flock_hash,
&req->l_policy_data.l_flock.owner,
&req->l_exp_flock_hash);
-error:
- return rc;
}
static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
}
static inline void
-ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
+ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
{
ENTRY;
- LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
- mode, flags);
+ LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
+ mode, flags);
- /* Safe to not lock here, since it should be empty anyway */
+ /* Safe to not lock here, since it should be empty anyway */
LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
cfs_list_del_init(&lock->l_res_link);
EXIT;
}
+/**
+ * POSIX locks deadlock detection code.
+ *
+ * Given a new lock \a req and an existing lock \a bl_lock it conflicts
+ * with, we need to iterate through all blocked POSIX locks for this
+ * export and see if there is a deadlock condition arising. (i.e. when
+ * one client holds a lock on something and want a lock on something
+ * else and at the same time another client has the opposite situation).
+ */
static int
ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
{
if (lock == NULL)
break;
+ LASSERT(req != lock);
flock = &lock->l_policy_data.l_flock;
LASSERT(flock->owner == bl_owner);
bl_owner = flock->blocking_owner;
return 0;
}
+static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
+ cfs_list_t *work_list)
+{
+ CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
+
+ if ((exp_connect_flags(lock->l_export) &
+ OBD_CONNECT_FLOCK_DEAD) == 0) {
+ CERROR("deadlock found, but client doesn't "
+ "support flock canceliation\n");
+ } else {
+ LASSERT(lock->l_completion_ast);
+ LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
+ lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
+ LDLM_FL_FLOCK_DEADLOCK;
+ ldlm_flock_blocking_unlink(lock);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_add_ast_work_item(lock, NULL, work_list);
+ }
+}
+
+/**
+ * Process a granting attempt for flock lock.
+ * Must be called under ns lock held.
+ *
+ * This function looks for any conflicts for \a lock in the granted or
+ * waiting queues. The lock is granted if no conflicts are found in
+ * either queue.
+ *
+ * It is also responsible for splitting a lock if a portion of the lock
+ * is released.
+ *
+ * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
+ * - blocking ASTs have already been sent
+ *
+ * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
+ * - blocking ASTs have not been sent yet, so list of conflicting locks
+ * would be collected and ASTs sent.
+ */
int
-ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
- ldlm_error_t *err, cfs_list_t *work_list)
+ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
+ ldlm_error_t *err, cfs_list_t *work_list)
{
struct ldlm_resource *res = req->l_resource;
struct ldlm_namespace *ns = ldlm_res_to_ns(res);
int overlaps = 0;
int splitted = 0;
const struct ldlm_callback_suite null_cbs = { NULL };
- int rc;
ENTRY;
- CDEBUG(D_DLMTRACE, "flags %#x owner "LPU64" pid %u mode %u start "LPU64
- " end "LPU64"\n", *flags, new->l_policy_data.l_flock.owner,
+ CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
+ LPU64" end "LPU64"\n", *flags,
+ new->l_policy_data.l_flock.owner,
new->l_policy_data.l_flock.pid, mode,
req->l_policy_data.l_flock.start,
req->l_policy_data.l_flock.end);
}
}
} else {
+ int reprocess_failed = 0;
lockmode_verify(mode);
/* This loop determines if there are existing locks
if (!ldlm_flocks_overlap(lock, req))
continue;
- if (!first_enq)
- RETURN(LDLM_ITER_CONTINUE);
+ if (!first_enq) {
+ reprocess_failed = 1;
+ if (ldlm_flock_deadlock(req, lock)) {
+ ldlm_flock_cancel_on_deadlock(req,
+ work_list);
+ RETURN(LDLM_ITER_CONTINUE);
+ }
+ continue;
+ }
if (*flags & LDLM_FL_BLOCK_NOWAIT) {
ldlm_flock_destroy(req, mode, *flags);
RETURN(LDLM_ITER_STOP);
}
- if (ldlm_flock_deadlock(req, lock)) {
- ldlm_flock_destroy(req, mode, *flags);
- *err = -EDEADLK;
- RETURN(LDLM_ITER_STOP);
- }
+ /* add lock to blocking list before deadlock
+ * check to prevent race */
+ ldlm_flock_blocking_link(req, lock);
- rc = ldlm_flock_blocking_link(req, lock);
- if (rc) {
+ if (ldlm_flock_deadlock(req, lock)) {
+ ldlm_flock_blocking_unlink(req);
ldlm_flock_destroy(req, mode, *flags);
- *err = rc;
+ *err = -EDEADLK;
RETURN(LDLM_ITER_STOP);
}
+
ldlm_resource_add_lock(res, &res->lr_waiting, req);
*flags |= LDLM_FL_BLOCK_GRANTED;
RETURN(LDLM_ITER_STOP);
}
+ if (reprocess_failed)
+ RETURN(LDLM_ITER_CONTINUE);
}
if (*flags & LDLM_FL_TEST_LOCK) {
* and restart processing this lock. */
if (!new2) {
unlock_res_and_lock(req);
- new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
- lock->l_granted_mode, &null_cbs,
- NULL, 0);
+ new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
+ lock->l_granted_mode, &null_cbs,
+ NULL, 0, LVB_T_NONE);
lock_res_and_lock(req);
if (!new2) {
ldlm_flock_destroy(req, lock->l_granted_mode,
#endif /* HAVE_SERVER_SUPPORT */
}
- /* In case we're reprocessing the requested lock we can't destroy
- * it until after calling ldlm_ast_work_item() above so that lawi()
- * can bump the reference count on req. Otherwise req could be freed
- * before the completion AST can be sent. */
+ /* In case we're reprocessing the requested lock we can't destroy
+ * it until after calling ldlm_add_ast_work_item() above so that laawi()
+ * can bump the reference count on \a req. Otherwise \a req
+ * could be freed before the completion AST can be sent. */
if (added)
ldlm_flock_destroy(req, mode, *flags);
lock_res_and_lock(lock);
ldlm_flock_blocking_unlink(lock);
- /* client side - set flag to prevent lock from being put on lru list */
+ /* client side - set flag to prevent lock from being put on LRU list */
lock->l_flags |= LDLM_FL_CBPENDING;
unlock_res_and_lock(lock);
}
/**
- * Flock completion calback function.
+ * Flock completion callback function.
*
* \param lock [in,out]: A lock to be handled
* \param flags [in]: flags
* \retval <0 : failure
*/
int
-ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
+ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
{
- cfs_flock_t *getlk = lock->l_ast_data;
+ struct file_lock *getlk = lock->l_ast_data;
struct obd_device *obd;
struct obd_import *imp = NULL;
struct ldlm_flock_wait_data fwd;
int rc = 0;
ENTRY;
- CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
+ CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
flags, data, getlk);
/* Import invalidation. We need to actually release the lock
imp = obd->u.cli.cl_import;
if (NULL != imp) {
- cfs_spin_lock(&imp->imp_lock);
- fwd.fwd_generation = imp->imp_generation;
- cfs_spin_unlock(&imp->imp_lock);
+ spin_lock(&imp->imp_lock);
+ fwd.fwd_generation = imp->imp_generation;
+ spin_unlock(&imp->imp_lock);
}
lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
granted:
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
- if (lock->l_destroyed) {
- LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
- RETURN(0);
- }
+ if (lock->l_flags & LDLM_FL_DESTROYED) {
+ LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
+ RETURN(0);
+ }
if (lock->l_flags & LDLM_FL_FAILED) {
LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
/* ldlm_lock_enqueue() has already placed lock on the granted list. */
cfs_list_del_init(&lock->l_res_link);
- if (flags & LDLM_FL_TEST_LOCK) {
+ if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
+ LDLM_DEBUG(lock, "client-side enqueue deadlock received");
+ rc = -EDEADLK;
+ } else if (flags & LDLM_FL_TEST_LOCK) {
/* fcntl(F_GETLK) request */
/* The old mode was saved in getlk->fl_type so that if the mode
* in the lock changes we can decref the appropriate refcount.*/
- ldlm_flock_destroy(lock, cfs_flock_type(getlk),
- LDLM_FL_WAIT_NOREPROC);
- switch (lock->l_granted_mode) {
- case LCK_PR:
- cfs_flock_set_type(getlk, F_RDLCK);
- break;
- case LCK_PW:
- cfs_flock_set_type(getlk, F_WRLCK);
- break;
- default:
- cfs_flock_set_type(getlk, F_UNLCK);
- }
- cfs_flock_set_pid(getlk,
- (pid_t)lock->l_policy_data.l_flock.pid);
- cfs_flock_set_start(getlk,
- (loff_t)lock->l_policy_data.l_flock.start);
- cfs_flock_set_end(getlk,
- (loff_t)lock->l_policy_data.l_flock.end);
- } else {
- int noreproc = LDLM_FL_WAIT_NOREPROC;
-
- /* We need to reprocess the lock to do merges or splits
- * with existing locks owned by this process. */
- ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
- }
- unlock_res_and_lock(lock);
- RETURN(0);
+ ldlm_flock_destroy(lock, flock_type(getlk),
+ LDLM_FL_WAIT_NOREPROC);
+ switch (lock->l_granted_mode) {
+ case LCK_PR:
+ flock_set_type(getlk, F_RDLCK);
+ break;
+ case LCK_PW:
+ flock_set_type(getlk, F_WRLCK);
+ break;
+ default:
+ flock_set_type(getlk, F_UNLCK);
+ }
+ flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
+ flock_set_start(getlk,
+ (loff_t)lock->l_policy_data.l_flock.start);
+ flock_set_end(getlk,
+ (loff_t)lock->l_policy_data.l_flock.end);
+ } else {
+ __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
+
+ /* We need to reprocess the lock to do merges or splits
+ * with existing locks owned by this process. */
+ ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
+ }
+ unlock_res_and_lock(lock);
+ RETURN(rc);
}
EXPORT_SYMBOL(ldlm_flock_completion_ast);
int ldlm_init_flock_export(struct obd_export *exp)
{
+ if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
+ RETURN(0);
+
exp->exp_flock_hash =
cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
HASH_EXP_LOCK_CUR_BITS,