Add support of asynchronous flocks.
They are used only by Linux nfsd for now.
HPE-bug-id: LUS-3210, LUS-7034,LUS-7031,LUS-8832, LUS-8313
HPE-bug-id: LUS-8592
Change-Id: Iefafaf014fd06d569dc5d1dd22ebb3518d04e99a
Reviewed-by: Vitaly Fertman <c17818@cray.com>
Reviewed-by: Alexander Boyko <c17825@cray.com>
Signed-off-by: Andriy Skulysh <andriy.skulysh@hpe.com>
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/4889
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Alexander Boyko <alexander.boyko@hpe.com>
Reviewed-by: Alexey Lyashkov <alexey.lyashkov@hpe.com>
Reviewed-by: Vitaly Fertman <vitaly.fertman@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
]) # LC_HAVE_BLK_INTEGRITY_ITER
#
+# LC_HAVE_LM_GRANT_2ARGS
+#
+# 3.17 removed unused argument from lm_grant
+#
+AC_DEFUN([LC_HAVE_LM_GRANT_2ARGS], [
+LB_CHECK_COMPILE([if 'lock_manager_operations.lm_grant' takes two args],
+lm_grant, [
+ #include <linux/fs.h>
+],[
+ ((struct lock_manager_operations *)NULL)->lm_grant(NULL, 0);
+],[
+ AC_DEFINE(HAVE_LM_GRANT_2ARGS, 1,
+ [lock_manager_operations.lm_grant takes two args])
+])
+]) # LC_HAVE_LM_GRANT_2ARGS
+
+#
# LC_NFS_FILLDIR_USE_CTX
#
# 3.18 kernel moved from void cookie to struct dir_context
[kernel has locks_lock_file_wait in filelock.h])
AC_DEFINE(HAVE_LINUX_FILELOCK_HEADER, 1,
[linux/filelock.h is present])
+ AC_DEFINE(HAVE_LM_GRANT_2ARGS, 1,
+ [lock_manager_operations.lm_grant takes two args])
])
]) # LC_HAVE_LOCKS_LOCK_FILE_WAIT_IN_FILELOCK
LC_HAVE_INTERVAL_BLK_INTEGRITY
LC_KEY_MATCH_DATA
LC_HAVE_BLK_INTEGRITY_ITER
+ LC_HAVE_LM_GRANT_2ARGS
# 3.18
LC_PERCPU_COUNTER_INIT
#include <lustre_import.h>
#include <lustre_handles.h>
#include <linux/interval_tree_generic.h>
+#ifdef HAVE_LINUX_FILELOCK_HEADER
+#include <linux/filelock.h>
+#endif
#include "lustre_dlm_flags.h"
* that are waiting for conflicts to go away
*/
struct list_head lr_waiting;
+ /* List of locks that waiting to enqueueing for flock */
+ struct list_head lr_enqueueing;
/** @} */
/** Resource name */
#define ei_res_id ei_cb_gl
+enum ldlm_flock_flags {
+ FA_FL_CANCEL_RQST = 1,
+ FA_FL_CANCELED = 2,
+};
+
+struct ldlm_flock_info {
+ struct file *fa_file;
+ struct file_lock *fa_fl; /* original file_lock */
+ struct file_lock fa_flc; /* lock copy */
+ enum ldlm_flock_flags fa_flags;
+ enum ldlm_mode fa_mode;
+#ifdef HAVE_LM_GRANT_2ARGS
+ int (*fa_notify)(struct file_lock *, int);
+#else
+ int (*fa_notify)(struct file_lock *, struct file_lock *, int);
+#endif
+ int fa_err;
+ int fa_ready;
+ wait_queue_head_t fa_waitq;
+};
+
extern char *ldlm_lockname[];
extern char *ldlm_typename[];
extern const char *ldlm_it2str(enum ldlm_intent_flags it);
/* ldlm_flock.c */
int ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data);
+struct ldlm_flock_info *
+ldlm_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags,
+ void *data);
/* ldlm_extent.c */
__u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms);
const union ldlm_policy_data *, struct md_op_data *,
struct lustre_handle *, __u64);
+ int (*m_enqueue_async)(struct obd_export *, struct ldlm_enqueue_info *,
+ obd_enqueue_update_f, struct md_op_data *,
+ const union ldlm_policy_data *, __u64);
+
int (*m_getattr)(struct obd_export *, struct md_op_data *,
struct ptlrpc_request **);
LPROC_MD_CLOSE,
LPROC_MD_CREATE,
LPROC_MD_ENQUEUE,
+ LPROC_MD_ENQUEUE_ASYNC,
LPROC_MD_GETATTR,
LPROC_MD_INTENT_LOCK,
LPROC_MD_LINK,
extra_lock_flags);
}
+static inline int md_enqueue_async(struct obd_export *exp,
+ struct ldlm_enqueue_info *einfo,
+ obd_enqueue_update_f upcall,
+ struct md_op_data *op_data,
+ const union ldlm_policy_data *policy,
+ __u64 lock_flags)
+{
+ int rc;
+
+ ENTRY;
+ rc = exp_check_ops(exp);
+ if (rc)
+ RETURN(rc);
+
+ lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
+ LPROC_MD_ENQUEUE_ASYNC);
+
+ rc = exp->exp_obd->obd_type->typ_md_ops->m_enqueue_async(exp, einfo,
+ upcall, op_data,
+ policy, lock_flags);
+ RETURN(rc);
+}
+
static inline int md_getattr_name(struct obd_export *exp,
struct md_op_data *op_data,
struct ptlrpc_request **request)
lock->l_policy_data.l_flock.start));
}
+static int ldlm_flocks_are_equal(struct ldlm_lock *l1, struct ldlm_lock *l2)
+{
+ return ldlm_same_flock_owner(l1, l2) &&
+ l1->l_policy_data.l_flock.start ==
+ l2->l_policy_data.l_flock.start &&
+ l1->l_policy_data.l_flock.end ==
+ l2->l_policy_data.l_flock.end;
+}
+
static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
struct ldlm_lock *lock)
{
if (end < OBD_OBJECT_EOF)
end++;
}
+
+ if (*flags != LDLM_FL_WAIT_NOREPROC && mode == LCK_NL) {
+ /* This loop determines where this processes locks start
+ * in the resource lr_granted list.
+ */
+#ifdef HAVE_SERVER_SUPPORT
+ list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
+ LASSERT(lock->l_req_mode != LCK_NL);
+
+ if (ldlm_flocks_are_equal(req, lock)) {
+ /* To start cancel a waiting lock */
+ LIST_HEAD(rpc_list);
+
+ LDLM_DEBUG(lock, "server-side: cancel waiting");
+ /* client receives cancelled lock as granted
+ * with l_granted_mode == 0
+ */
+ LASSERT(lock->l_granted_mode == LCK_MINMODE);
+ lock->l_flags |= LDLM_FL_AST_SENT;
+ ldlm_resource_unlink_lock(lock);
+ ldlm_add_ast_work_item(lock, NULL, &rpc_list);
+ LDLM_LOCK_GET(lock);
+ unlock_res_and_lock(req);
+ ldlm_run_ast_work(ns, &rpc_list,
+ LDLM_WORK_CP_AST);
+ ldlm_lock_cancel(lock);
+ LDLM_LOCK_RELEASE(lock);
+ lock_res_and_lock(req);
+ break;
+ }
+ }
+#else /* !HAVE_SERVER_SUPPORT */
+ /* The only one possible case for client-side calls flock
+ * policy function is ldlm_flock_completion_ast inside which
+ * carries LDLM_FL_WAIT_NOREPROC flag.
+ */
+ CERROR("Illegal parameter for client-side-only module.\n");
+ LBUG();
+#endif /* HAVE_SERVER_SUPPORT */
+ }
if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
/* This loop collects all overlapping locks with the
* same owner.
for (lock = ownlocks; lock; lock = nextlock) {
nextlock = lock->l_same_owner;
+ /* lock was granted by ldlm_lock_enqueue()
+ * but not processed yet
+ */
+ if (*flags == LDLM_FL_WAIT_NOREPROC && lock->l_ast_data)
+ continue;
+
if (lock->l_granted_mode == mode) {
/*
* If the modes are the same then we need to process
RETURN(LDLM_ITER_CONTINUE);
}
+static void ldlm_flock_mark_canceled(struct ldlm_lock *lock)
+{
+ struct ldlm_flock_info *args;
+ struct ldlm_lock *waiting_lock = NULL;
+ struct ldlm_resource *res = lock->l_resource;
+
+ ENTRY;
+ check_res_locked(res);
+ list_for_each_entry(waiting_lock, &res->lr_enqueueing, l_res_link) {
+ if (ldlm_flocks_are_equal(waiting_lock, lock)) {
+ LDLM_DEBUG(lock, "mark canceled enqueueing lock");
+ args = waiting_lock->l_ast_data;
+ if (args)
+ args->fa_flags |= FA_FL_CANCELED;
+ RETURN_EXIT;
+ }
+ }
+ list_for_each_entry(waiting_lock, &res->lr_waiting, l_res_link) {
+ if (ldlm_flocks_are_equal(waiting_lock, lock)) {
+ LDLM_DEBUG(lock, "mark canceled waiting lock");
+ args = waiting_lock->l_ast_data;
+ if (args)
+ args->fa_flags |= FA_FL_CANCELED;
+ RETURN_EXIT;
+ }
+ }
+ EXIT;
+}
+
+static int ldlm_flock_completion_common(struct ldlm_lock *lock)
+{
+ struct ldlm_flock_info *args = lock->l_ast_data;
+ int rc = 0;
+
+ /* Protect against race where lock could have been just destroyed
+ * due to overlap in ldlm_process_flock_lock().
+ */
+ if (lock->l_flags & LDLM_FL_DESTROYED) {
+ LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
+ return -EIO;
+ }
+
+ /* Import invalidation. We need to actually release the lock
+ * references being held, so that it can go away. No point in
+ * holding the lock even if app still believes it has it, since
+ * server already dropped it anyway. Only for granted locks too.
+ * Do the same for DEADLOCK'ed locks.
+ */
+ if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
+ enum ldlm_mode mode = args ?
+ args->fa_mode : lock->l_granted_mode;
+
+ /* args is NULL only for granted locks */
+ LASSERT(args != NULL ||
+ lock->l_req_mode == lock->l_granted_mode);
+
+ if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
+ LDLM_DEBUG(lock,
+ "client-side enqueue deadlock received");
+ rc = -EDEADLK;
+ } else {
+ LDLM_DEBUG(lock, "client-side lock cleanup");
+ rc = -EIO;
+ }
+ ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
+ }
+
+ return rc;
+}
+
/**
* Flock completion callback function.
*
*
* \retval 0 : success
* \retval <0 : failure
+ *
+ * This funclion is called from:
+ * 1. ldlm_cli_enqueue_fini()
+ * a) grant a new lock or UNLOCK(l_granted_mode == LCK_NL) lock
+ * b) TEST lock, l_flags & LDLM_FL_TEST_LOCK; if can be granted
+ * server returns a conflicting lock, otherwise
+ * l_granted_mode == LCK_NL
+ * 2. ldlm_handle_cp_callback()
+ * a) grant a new lock
+ * b) cancel a DEADLOCK'ed lock, l_flags & LDLM_FL_FLOCK_DEADLOCK,
+ * l_granted_mode == 0
+ * c) cancel async waiting lock (F_CANCELLK), l_flags & FA_FL_CANCELED,
+ * l_granted_mode == 0
+ * 3. cleanup_resource() (called only for the forced umount case)
+ * a) a granted or waiting lock is to be destroyed,
+ * lock->l_flags & flags have LDLM_FL_FAILED.
+ * 4. races between the 3 above
+ * a) cleanup vs. reply or CP AST
+ * b) F_CANCELLK vs. CP AST granting a new lock
*/
int
ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
{
- struct file_lock *getlk = lock->l_ast_data;
+ struct ldlm_flock_info *args;
struct obd_device *obd;
enum ldlm_error err;
int rc = 0;
unlock_res_and_lock(lock);
CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
}
- CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n",
- flags, data, getlk);
+ CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p l_ast_data: %p\n",
+ flags, data, lock->l_ast_data);
LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
lock_res_and_lock(lock);
-
- /* Protect against race where lock could have been just destroyed
- * due to overlap in ldlm_process_flock_lock().
- */
- if (ldlm_is_destroyed(lock)) {
- unlock_res_and_lock(lock);
- LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
-
- /* error is returned up to ldlm_cli_enqueue_fini() caller. */
- RETURN(-EIO);
- }
-
- /* ldlm_lock_enqueue() has already placed lock on the granted list. */
- ldlm_resource_unlink_lock(lock);
-
- /* Import invalidation. We need to actually release the lock
- * references being held, so that it can go away. No point in
- * holding the lock even if app still believes it has it, since
- * server already dropped it anyway. Only for granted locks too.
- */
- /* Do the same for DEADLOCK'ed locks. */
- if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
- int mode;
-
- if (flags & LDLM_FL_TEST_LOCK)
- LASSERT(ldlm_is_test_lock(lock));
-
- if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
- mode = getlk->C_FLC_TYPE;
- else
- mode = lock->l_req_mode;
-
- if (ldlm_is_flock_deadlock(lock)) {
- LDLM_DEBUG(lock,
- "client-side enqueue deadlock received");
- rc = -EDEADLK;
- }
- ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
+ rc = ldlm_flock_completion_common(lock);
+ if (rc) {
+ lock->l_ast_data = NULL;
unlock_res_and_lock(lock);
/* Need to wake up the waiter if we were evicted */
/* An error is still to be returned, to propagate it up to
* ldlm_cli_enqueue_fini() caller.
*/
- RETURN(rc ? : -EIO);
+ RETURN(rc);
+ }
+
+ args = lock->l_ast_data;
+
+ if (lock->l_granted_mode == LCK_MINMODE) {
+ ldlm_flock_destroy(lock, args->fa_mode, LDLM_FL_WAIT_NOREPROC);
+ lock->l_ast_data = NULL;
+ unlock_res_and_lock(lock);
+ CERROR("%s: client-side: only asynchronous lock enqueue can be canceled by CANCELK\n",
+ lock->l_export->exp_obd->obd_name);
+ RETURN(-EIO);
+ }
+
+ if (args->fa_flags & FA_FL_CANCEL_RQST) {
+ LDLM_DEBUG(lock, "client-side granted CANCELK lock");
+ ldlm_flock_mark_canceled(lock);
}
LDLM_DEBUG(lock, "client-side enqueue granted");
if (flags & LDLM_FL_TEST_LOCK) {
- /*
- * fcntl(F_GETLK) request
- * The old mode was saved in getlk->C_FLC_TYPE so that if the mode
- * in the lock changes we can decref the appropriate refcount.
- */
+ struct file_lock *getlk = args->fa_fl;
+ /* fcntl(F_GETLK) request */
LASSERT(ldlm_is_test_lock(lock));
- ldlm_flock_destroy(lock, getlk->C_FLC_TYPE, LDLM_FL_WAIT_NOREPROC);
+ ldlm_flock_destroy(lock, args->fa_mode, LDLM_FL_WAIT_NOREPROC);
+
switch (lock->l_granted_mode) {
case LCK_PR:
getlk->C_FLC_TYPE = F_RDLCK;
} else {
__u64 noreproc = LDLM_FL_WAIT_NOREPROC;
+ /* ldlm_lock_enqueue() has already placed lock on the granted
+ * list.
+ */
+ ldlm_resource_unlink_lock(lock);
+
/* We need to reprocess the lock to do merges or splits
* with existing locks owned by this process.
*/
ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
}
+ lock->l_ast_data = NULL;
unlock_res_and_lock(lock);
RETURN(rc);
}
EXPORT_SYMBOL(ldlm_flock_completion_ast);
+/* This function is called in same cases as ldlm_flock_completion_ast()
+ * except UNLOCK, TEST lock, F_CANCELLK which are using only
+ * synchronous mechanism
+ */
+struct ldlm_flock_info *
+ldlm_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
+{
+ __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
+ enum ldlm_error err;
+ int rc;
+ struct ldlm_flock_info *args;
+
+ ENTRY;
+ LDLM_DEBUG(lock, "flags: 0x%llx data: %p l_ast_data: %p",
+ flags, data, lock->l_ast_data);
+
+ LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
+
+ lock_res_and_lock(lock);
+
+ args = lock->l_ast_data;
+ rc = ldlm_flock_completion_common(lock);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (lock->l_granted_mode != LCK_NL) {
+ if (args == NULL) {
+ LDLM_DEBUG(lock,
+ "client-side lock is already granted in a race");
+ LASSERT(lock->l_granted_mode == lock->l_req_mode);
+ LASSERT(lock->l_granted_mode != LCK_MINMODE);
+ GOTO(out, rc = 0);
+ }
+
+ if (args->fa_flags & FA_FL_CANCELED ||
+ ((flags & LDLM_FL_BLOCKED_MASK) == 0 &&
+ lock->l_granted_mode == LCK_MINMODE)) {
+ LDLM_DEBUG(lock, "client-side granted canceled lock");
+ ldlm_flock_destroy(lock, args->fa_mode,
+ LDLM_FL_WAIT_NOREPROC);
+ GOTO(out, rc = -EIO);
+ }
+ }
+
+ if (flags & LDLM_FL_BLOCKED_MASK) {
+ LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock");
+ args = NULL;
+ GOTO(out, rc = 0);
+ }
+
+ if (data != NULL)
+ LDLM_DEBUG(lock, "client-side granted a blocked lock");
+ else
+ LDLM_DEBUG(lock, "client-side lock granted");
+
+ /* ldlm_lock_enqueue() has already placed lock on the granted list. */
+ ldlm_resource_unlink_lock(lock);
+
+ /* We need to reprocess the lock to do merges or splits
+ * with existing locks owned by this process.
+ */
+ ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
+
+out:
+ if (args != NULL) {
+ lock->l_ast_data = NULL;
+ args->fa_err = rc;
+ }
+ unlock_res_and_lock(lock);
+
+ RETURN(args);
+}
+EXPORT_SYMBOL(ldlm_flock_completion_ast_async);
+
int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
void *data, int flag)
{
}
EXPORT_SYMBOL(ldlm_enqueue_pack);
+static void ldlm_lock_add_to_enqueueing(struct ldlm_lock *lock)
+{
+ struct ldlm_resource *res = lock->l_resource;
+
+ lock_res(res);
+ ldlm_resource_add_lock(res, &res->lr_enqueueing, lock);
+ unlock_res(res);
+}
+
/**
* Client-side lock enqueue.
*
LBUG();
lock->l_req_extent = policy->l_extent;
+ } else if (einfo->ei_type == LDLM_FLOCK) {
+ ldlm_lock_add_to_enqueueing(lock);
}
LDLM_DEBUG(lock, "client-side enqueue START, flags %#llx",
*flags);
INIT_LIST_HEAD(&res->lr_granted);
INIT_LIST_HEAD(&res->lr_waiting);
+ INIT_LIST_HEAD(&res->lr_enqueueing);
refcount_set(&res->lr_refcount, 1);
spin_lock_init(&res->lr_lock);
LBUG();
}
+ if (!list_empty(&res->lr_enqueueing)) {
+ ldlm_resource_dump(D_ERROR, res);
+ LBUG();
+ }
+
cfs_hash_bd_del_locked(nsb->nsb_namespace->ns_rs_hash,
bd, &res->lr_hash);
if (atomic_dec_and_test(&nsb->nsb_count))
if (res->lr_type == LDLM_IBITS)
ldlm_inodebits_add_lock(res, head, lock, tail);
+ else if (res->lr_type == LDLM_FLOCK)
+ LASSERT(lock->l_req_mode != LCK_NL || head != &res->lr_waiting);
ldlm_resource_dump(D_INFO, res);
}
RETURN(rc);
}
-static int
-ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
+static int ll_file_flc2policy(struct file_lock *file_lock, int cmd,
+ union ldlm_policy_data *flock)
{
- struct inode *inode = file_inode(file);
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ldlm_enqueue_info einfo = {
- .ei_type = LDLM_FLOCK,
- .ei_cb_cp = ldlm_flock_completion_ast,
- .ei_cbdata = file_lock,
- };
- struct md_op_data *op_data;
- struct lustre_handle lockh = { 0 };
- union ldlm_policy_data flock = { { 0 } };
- struct file_lock flbuf = *file_lock;
- int fl_type = file_lock->C_FLC_TYPE;
- ktime_t kstart = ktime_get();
- __u64 flags = 0;
- int rc;
- int rc2 = 0;
-
ENTRY;
- CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n",
- PFID(ll_inode2fid(inode)), file_lock);
if (file_lock->C_FLC_FLAGS & FL_FLOCK) {
LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
/* flocks are whole-file locks */
- flock.l_flock.end = OFFSET_MAX;
+ flock->l_flock.end = OFFSET_MAX;
/* For flocks owner is determined by the local file desctiptor*/
- flock.l_flock.owner = (unsigned long)file_lock->C_FLC_FILE;
+ flock->l_flock.owner = (unsigned long)file_lock->C_FLC_FILE;
} else if (file_lock->C_FLC_FLAGS & FL_POSIX) {
- flock.l_flock.owner = (unsigned long)file_lock->C_FLC_OWNER;
- flock.l_flock.start = file_lock->fl_start;
- flock.l_flock.end = file_lock->fl_end;
+ flock->l_flock.owner = (unsigned long)file_lock->C_FLC_OWNER;
+ flock->l_flock.start = file_lock->fl_start;
+ flock->l_flock.end = file_lock->fl_end;
} else {
RETURN(-EINVAL);
}
- flock.l_flock.pid = file_lock->C_FLC_PID;
+ flock->l_flock.pid = file_lock->C_FLC_PID;
#if defined(HAVE_LM_COMPARE_OWNER) || defined(lm_compare_owner)
/* Somewhat ugly workaround for svc lockd.
* pointer space for current->files are not intersecting
*/
if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
- flock.l_flock.owner = (unsigned long)file_lock->C_FLC_PID;
+ flock->l_flock.owner = (unsigned long)file_lock->C_FLC_PID;
+#endif
+
+ RETURN(0);
+}
+
+static int ll_file_flock_lock(struct file *file, struct file_lock *file_lock)
+{
+ int rc = -EINVAL;
+
+ /* We don't need to sleep on conflicting locks.
+ * It is called in following usecases :
+ * 1. adding new lock - no conflicts exist as it is already granted
+ * on the server.
+ * 2. unlock - never conflicts with anything.
+ */
+ file_lock->fl_flags &= ~FL_SLEEP;
+#ifdef HAVE_LOCKS_LOCK_FILE_WAIT
+ rc = locks_lock_file_wait(file, file_lock);
+#else
+ if (file_lock->fl_flags & FL_FLOCK) {
+ rc = flock_lock_file_wait(file, file_lock);
+ } else if (file_lock->fl_flags & FL_POSIX) {
+ rc = posix_lock_file(file, file_lock, NULL);
+ }
+#endif /* HAVE_LOCKS_LOCK_FILE_WAIT */
+ if (rc)
+ CDEBUG_LIMIT(rc == -ENOENT ? D_DLMTRACE : D_ERROR,
+ "kernel lock failed: rc = %d\n", rc);
+
+ return rc;
+}
+
+static int ll_flock_upcall(void *cookie, int err);
+static int
+ll_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data);
+
+static int ll_file_flock_async_unlock(struct inode *inode,
+ struct file_lock *file_lock)
+{
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
+ .ei_cb_cp =
+ ll_flock_completion_ast_async,
+ .ei_mode = LCK_NL,
+ .ei_cbdata = NULL };
+ union ldlm_policy_data flock = { {0} };
+ struct md_op_data *op_data;
+ int rc;
+
+ ENTRY;
+ rc = ll_file_flc2policy(file_lock, F_SETLK, &flock);
+ if (rc)
+ RETURN(rc);
+
+ op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
+ LUSTRE_OPC_ANY, NULL);
+ if (IS_ERR(op_data))
+ RETURN(PTR_ERR(op_data));
+
+ rc = md_enqueue_async(sbi->ll_md_exp, &einfo, ll_flock_upcall,
+ op_data, &flock, 0);
+
+ ll_finish_md_op_data(op_data);
+
+ RETURN(rc);
+}
+
+/* This function is called only once after ldlm callback. Args are already
+ * detached from lock. So, locking isn't needed.
+ * It should only report lock status to kernel.
+ */
+static void ll_file_flock_async_cb(struct ldlm_flock_info *args)
+{
+ struct file_lock *file_lock = args->fa_fl;
+ struct file_lock *flc = &args->fa_flc;
+ struct file *file = args->fa_file;
+ struct inode *inode = file->f_path.dentry->d_inode;
+ int err = args->fa_err;
+ int rc;
+
+ ENTRY;
+ CDEBUG(D_INFO, "err=%d file_lock=%p file=%p start=%llu end=%llu\n",
+ err, file_lock, file, flc->fl_start, flc->fl_end);
+
+ /* The kernel is responsible for resolving grant vs F_CANCELK or
+ * grant vs. cleanup races, it may happen that CANCELED flag
+ * isn't set and err == 0, because f_CANCELK/cleanup happens between
+ * ldlm_flock_completion_ast_async() and ll_flock_run_flock_cb().
+ * In this case notify() returns error for already canceled flock.
+ */
+ if (!(args->fa_flags & FA_FL_CANCELED)) {
+ struct file_lock notify_lock;
+
+ locks_init_lock(¬ify_lock);
+ locks_copy_lock(¬ify_lock, flc);
+
+ if (err == 0)
+ ll_file_flock_lock(file, flc);
+
+ wait_event_idle(args->fa_waitq, args->fa_ready);
+
+#ifdef HAVE_LM_GRANT_2ARGS
+ rc = args->fa_notify(¬ify_lock, err);
+#else
+ rc = args->fa_notify(¬ify_lock, NULL, err);
#endif
+ if (rc) {
+ CDEBUG_LIMIT(D_ERROR,
+ "notify failed file_lock=%p err=%d\n",
+ file_lock, err);
+ if (err == 0) {
+ flc->C_FLC_TYPE = F_UNLCK;
+ ll_file_flock_lock(file, flc);
+ ll_file_flock_async_unlock(inode, flc);
+ }
+ }
+ }
+
+ fput(file);
+
+ EXIT;
+}
+
+static void ll_flock_run_flock_cb(struct ldlm_flock_info *args)
+{
+ if (args) {
+ ll_file_flock_async_cb(args);
+ OBD_FREE_PTR(args);
+ }
+}
+
+static int ll_flock_upcall(void *cookie, int err)
+{
+ struct ldlm_flock_info *args;
+ struct ldlm_lock *lock = cookie;
+
+ if (err != 0) {
+ CERROR("ldlm_cli_enqueue_fini lock=%p : rc = %d\n", lock, err);
+
+ lock_res_and_lock(lock);
+ args = lock->l_ast_data;
+ lock->l_ast_data = NULL;
+ unlock_res_and_lock(lock);
+
+ if (args)
+ args->fa_err = err;
+ ll_flock_run_flock_cb(args);
+ }
+
+ return 0;
+}
+
+static int
+ll_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
+{
+ struct ldlm_flock_info *args;
+
+ ENTRY;
+
+ args = ldlm_flock_completion_ast_async(lock, flags, data);
+ if (args && args->fa_flags & FA_FL_CANCELED) {
+ /* lock was cancelled in a race */
+ struct inode *inode = args->fa_file->f_path.dentry->d_inode;
+
+ ll_file_flock_async_unlock(inode, &args->fa_flc);
+ }
+
+ ll_flock_run_flock_cb(args);
+
+ RETURN(0);
+}
+
+static int
+ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
+{
+ struct inode *inode = file_inode(file);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ldlm_enqueue_info einfo = {
+ .ei_type = LDLM_FLOCK,
+ .ei_cb_cp = ldlm_flock_completion_ast,
+ .ei_cbdata = NULL,
+ };
+ struct md_op_data *op_data;
+ struct lustre_handle lockh = { 0 };
+ union ldlm_policy_data flock = { { 0 } };
+ struct file_lock flbuf = *file_lock;
+ int fl_type = file_lock->C_FLC_TYPE;
+ ktime_t kstart = ktime_get();
+ __u64 flags = 0;
+ struct ldlm_flock_info *cb_data = NULL;
+ int rc;
+
+ ENTRY;
+ CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n",
+ PFID(ll_inode2fid(inode)), file_lock);
+
+ ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
+
+ rc = ll_file_flc2policy(file_lock, cmd, &flock);
+ if (rc)
+ RETURN(rc);
switch (fl_type) {
case F_RDLCK:
*/
posix_test_lock(file, &flbuf);
break;
+ case F_CANCELLK:
+ CDEBUG(D_DLMTRACE, "F_CANCELLK owner=%llx %llu-%llu\n",
+ flock.l_flock.owner, flock.l_flock.start,
+ flock.l_flock.end);
+ file_lock->C_FLC_TYPE = F_UNLCK;
+ einfo.ei_mode = LCK_NL;
+ break;
default:
rc = -EINVAL;
CERROR("%s: fcntl from '%s' unknown lock command=%d: rc = %d\n",
RETURN(rc);
}
- /* Save the old mode so that if the mode in the lock changes we
- * can decrement the appropriate reader or writer refcount.
- */
- file_lock->C_FLC_TYPE = einfo.ei_mode;
+ CDEBUG(D_DLMTRACE,
+ "inode="DFID", pid=%u, owner=%#llx, flags=%#llx, mode=%u, start=%llu, end=%llu\n",
+ PFID(ll_inode2fid(inode)), flock.l_flock.pid,
+ flock.l_flock.owner, flags, einfo.ei_mode,
+ flock.l_flock.start, flock.l_flock.end);
op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
- CDEBUG(D_DLMTRACE,
- "inode="DFID", pid=%u, flags=%#llx, mode=%u, start=%llu, end=%llu\n",
- PFID(ll_inode2fid(inode)),
- flock.l_flock.pid, flags, einfo.ei_mode,
- flock.l_flock.start, flock.l_flock.end);
+ OBD_ALLOC_PTR(cb_data);
+ if (!cb_data)
+ GOTO(out, rc = -ENOMEM);
- rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, &lockh,
- flags);
+ cb_data->fa_file = file;
+ cb_data->fa_fl = file_lock;
+ cb_data->fa_mode = einfo.ei_mode;
+ init_waitqueue_head(&cb_data->fa_waitq);
+ locks_init_lock(&cb_data->fa_flc);
+ locks_copy_lock(&cb_data->fa_flc, file_lock);
+ if (cmd == F_CANCELLK)
+ cb_data->fa_flags |= FA_FL_CANCEL_RQST;
+ einfo.ei_cbdata = cb_data;
+
+ if (file_lock->fl_lmops && file_lock->fl_lmops->lm_grant &&
+ file_lock->C_FLC_TYPE != F_UNLCK &&
+ flags == LDLM_FL_BLOCK_NOWAIT /* F_SETLK/F_SETLK64 */) {
+
+ cb_data->fa_notify = file_lock->fl_lmops->lm_grant;
+ flags = (file_lock->fl_flags & FL_SLEEP) ?
+ 0 : LDLM_FL_BLOCK_NOWAIT;
+ einfo.ei_cb_cp = ll_flock_completion_ast_async;
+ get_file(file);
+
+ rc = md_enqueue_async(sbi->ll_md_exp, &einfo,
+ ll_flock_upcall, op_data, &flock, flags);
+ if (rc) {
+ fput(file);
+ OBD_FREE_PTR(cb_data);
+ cb_data = NULL;
+ } else {
+ rc = FILE_LOCK_DEFERRED;
+ }
+ } else {
+ if (file_lock->C_FLC_TYPE == F_UNLCK &&
+ flags != LDLM_FL_TEST_LOCK) {
+ /* We unlock kernel lock before ldlm one to avoid race
+ * with reordering of unlock & lock responses from
+ * server.
+ */
+ cb_data->fa_flc.fl_flags |= FL_EXISTS;
+ rc = ll_file_flock_lock(file, &cb_data->fa_flc);
+ if (rc) {
+ if (rc == -ENOENT) {
+ if (!(file_lock->C_FLC_TYPE &
+ FL_EXISTS))
+ rc = 0;
+ } else {
+ CDEBUG_LIMIT(D_ERROR,
+ "local unlock failed rc=%d\n",
+ rc);
+ }
+ OBD_FREE_PTR(cb_data);
+ cb_data = NULL;
+ GOTO(out, rc);
+ }
+ }
- /* Restore the file lock type if not TEST lock. */
- if (!(flags & LDLM_FL_TEST_LOCK))
- file_lock->C_FLC_TYPE = fl_type;
+ rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data,
+ &lockh, flags);
-#ifdef HAVE_LOCKS_LOCK_FILE_WAIT
- if ((rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK) &&
- !(flags & LDLM_FL_TEST_LOCK))
- rc2 = locks_lock_file_wait(file, file_lock);
-#else
- if ((file_lock->C_FLC_FLAGS & FL_FLOCK) &&
- (rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK))
- rc2 = flock_lock_file_wait(file, file_lock);
- if ((file_lock->C_FLC_FLAGS & FL_POSIX) &&
- (rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK) &&
- !(flags & LDLM_FL_TEST_LOCK))
- rc2 = posix_lock_file_wait(file, file_lock);
-#endif /* HAVE_LOCKS_LOCK_FILE_WAIT */
- if (rc2 && file_lock->C_FLC_TYPE != F_UNLCK) {
- einfo.ei_mode = LCK_NL;
- md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data,
- &lockh, flags);
- rc = rc2;
- }
+ if (!rc && file_lock->C_FLC_TYPE != F_UNLCK &&
+ !(flags & LDLM_FL_TEST_LOCK)) {
+ int rc2;
+ rc2 = ll_file_flock_lock(file, file_lock);
+
+ if (rc2) {
+ einfo.ei_mode = LCK_NL;
+ cb_data->fa_mode = einfo.ei_mode;
+ md_enqueue(sbi->ll_md_exp, &einfo, &flock,
+ op_data, &lockh, flags);
+ rc = rc2;
+ }
+ }
+ OBD_FREE_PTR(cb_data);
+ cb_data = NULL;
+ }
+out:
ll_finish_md_op_data(op_data);
+ if (cb_data) {
+ cb_data->fa_ready = 1;
+ wake_up(&cb_data->fa_waitq);
+ }
if (rc == 0 && (flags & LDLM_FL_TEST_LOCK) &&
flbuf.C_FLC_TYPE != file_lock->C_FLC_TYPE) { /* Verify local & remote */
RETURN(rc);
}
+static int
+lmv_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ obd_enqueue_update_f upcall, struct md_op_data *op_data,
+ const union ldlm_policy_data *policy, __u64 flags)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+
+ ENTRY;
+
+ CDEBUG(D_INODE, "ENQUEUE ASYNC on "DFID"\n",
+ PFID(&op_data->op_fid1));
+
+ tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ CDEBUG(D_INODE, "ENQUEUE ASYNC on "DFID" -> mds #%d\n",
+ PFID(&op_data->op_fid1), tgt->ltd_index);
+
+ rc = md_enqueue_async(tgt->ltd_exp, einfo, upcall, op_data, policy,
+ flags);
+
+ RETURN(rc);
+}
+
int
lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **preq)
.m_close = lmv_close,
.m_create = lmv_create,
.m_enqueue = lmv_enqueue,
+ .m_enqueue_async = lmv_enqueue_async,
.m_getattr = lmv_getattr,
.m_getxattr = lmv_getxattr,
.m_getattr_name = lmv_getattr_name,
const union ldlm_policy_data *policy,
struct md_op_data *op_data,
struct lustre_handle *lockh, __u64 extra_lock_flags);
+
+int mdc_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ obd_enqueue_update_f upcall, struct md_op_data *op_data,
+ const union ldlm_policy_data *policy, __u64 lock_flags);
+
int mdc_resource_get_unused_res(struct obd_export *exp,
struct ldlm_res_id *res_id,
struct list_head *cancels,
struct md_op_item *ga_item;
};
+struct mdc_enqueue_args {
+ struct ldlm_lock *mea_lock;
+ struct obd_export *mea_exp;
+ enum ldlm_mode mea_mode;
+ __u64 mea_flags;
+ obd_enqueue_update_f mea_upcall;
+};
+
int it_open_error(int phase, struct lookup_intent *it)
{
if (it_disposition(it, DISP_OPEN_LEASE)) {
op_data, lockh, extra_lock_flags);
}
+static int mdc_enqueue_async_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *args, int rc)
+{
+ struct mdc_enqueue_args *mea = args;
+ struct obd_export *exp = mea->mea_exp;
+ struct ldlm_lock *lock = mea->mea_lock;
+ struct lustre_handle lockh;
+ struct ldlm_enqueue_info einfo = {
+ .ei_type = LDLM_FLOCK,
+ .ei_mode = mea->mea_mode,
+ };
+
+ ENTRY;
+ CDEBUG(D_INFO, "req=%p rc=%d\n", req, rc);
+
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_enqueue_fini(exp, &req->rq_pill, &einfo, 1,
+ &mea->mea_flags, NULL, 0, &lockh, rc, true);
+ if (rc == -ENOLCK)
+ LDLM_LOCK_RELEASE(lock);
+
+ /* we expect failed_lock_cleanup() to destroy lock */
+ if (rc != 0)
+ LASSERT(list_empty(&lock->l_res_link));
+
+ if (mea->mea_upcall != NULL)
+ mea->mea_upcall(lock, rc);
+
+ LDLM_LOCK_PUT(lock);
+
+ RETURN(rc);
+}
+
+int mdc_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ obd_enqueue_update_f upcall, struct md_op_data *op_data,
+ const union ldlm_policy_data *policy, __u64 flags)
+{
+ struct mdc_enqueue_args *mea;
+ struct ptlrpc_request *req;
+ int rc;
+ struct ldlm_res_id res_id;
+ struct lustre_handle lockh;
+
+ ENTRY;
+ fid_build_reg_res_name(&op_data->op_fid1, &res_id);
+
+ LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
+ einfo->ei_type);
+ res_id.name[3] = LDLM_FLOCK;
+
+ req = ldlm_enqueue_pack(exp, 0);
+ if (IS_ERR(req))
+ RETURN(PTR_ERR(req));
+
+ einfo->ei_req_slot = 1;
+ einfo->ei_mod_slot = 1;
+
+ rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
+ 0, 0, &lockh, 1);
+ if (rc) {
+ ptlrpc_req_put(req);
+ RETURN(rc);
+ }
+
+ mea = ptlrpc_req_async_args(mea, req);
+ mea->mea_exp = exp;
+ mea->mea_lock = ldlm_handle2lock(&lockh);
+ LASSERT(mea->mea_lock != NULL);
+
+ mea->mea_mode = einfo->ei_mode;
+ mea->mea_flags = flags;
+ mea->mea_upcall = upcall;
+
+ req->rq_interpret_reply = mdc_enqueue_async_interpret;
+ ptlrpcd_add_req(req);
+
+ RETURN(0);
+}
+
static int mdc_finish_intent_lock(struct obd_export *exp,
struct ptlrpc_request *request,
struct md_op_data *op_data,
.m_close = mdc_close,
.m_create = mdc_create,
.m_enqueue = mdc_enqueue,
+ .m_enqueue_async = mdc_enqueue_async,
.m_getattr = mdc_getattr,
.m_getattr_name = mdc_getattr_name,
.m_intent_lock = mdc_intent_lock,
[LPROC_MD_CLOSE] = "close",
[LPROC_MD_CREATE] = "create",
[LPROC_MD_ENQUEUE] = "enqueue",
+ [LPROC_MD_ENQUEUE_ASYNC] = "enqueue_async",
[LPROC_MD_GETATTR] = "getattr",
[LPROC_MD_INTENT_LOCK] = "intent_lock",
[LPROC_MD_LINK] = "link",