From 3177b0dc5d18a8e3d77eb10fa6f266ff83cbe222 Mon Sep 17 00:00:00 2001 From: Andriy Skulysh Date: Tue, 5 Feb 2019 15:37:48 +0200 Subject: [PATCH] LU-2525 ldlm: add asynchronous flocks Add support of asynchronous flocks. They are used only by Linux nfsd for now. HPE-bug-id: LUS-3210, LUS-7034,LUS-7031,LUS-8832, LUS-8313 HPE-bug-id: LUS-8592 Change-Id: Iefafaf014fd06d569dc5d1dd22ebb3518d04e99a Reviewed-by: Vitaly Fertman Reviewed-by: Alexander Boyko Signed-off-by: Andriy Skulysh Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/4889 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Alexander Boyko Reviewed-by: Alexey Lyashkov Reviewed-by: Vitaly Fertman Reviewed-by: Oleg Drokin --- lustre/autoconf/lustre-core.m4 | 20 +++ lustre/include/lustre_dlm.h | 29 ++++ lustre/include/obd.h | 4 + lustre/include/obd_class.h | 24 +++ lustre/ldlm/ldlm_flock.c | 299 ++++++++++++++++++++++++++------ lustre/ldlm/ldlm_request.c | 11 ++ lustre/ldlm/ldlm_resource.c | 8 + lustre/llite/file.c | 361 ++++++++++++++++++++++++++++++++------- lustre/lmv/lmv_obd.c | 29 ++++ lustre/mdc/mdc_internal.h | 5 + lustre/mdc/mdc_locks.c | 88 ++++++++++ lustre/mdc/mdc_request.c | 1 + lustre/obdclass/lprocfs_status.c | 1 + 13 files changed, 771 insertions(+), 109 deletions(-) diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4 index d955dc2..794dfba 100644 --- a/lustre/autoconf/lustre-core.m4 +++ b/lustre/autoconf/lustre-core.m4 @@ -1146,6 +1146,23 @@ AC_DEFUN([LC_HAVE_BLK_INTEGRITY_ITER], [ ]) # LC_HAVE_BLK_INTEGRITY_ITER # +# LC_HAVE_LM_GRANT_2ARGS +# +# 3.17 removed unused argument from lm_grant +# +AC_DEFUN([LC_HAVE_LM_GRANT_2ARGS], [ +LB_CHECK_COMPILE([if 'lock_manager_operations.lm_grant' takes two args], +lm_grant, [ + #include +],[ + ((struct lock_manager_operations *)NULL)->lm_grant(NULL, 0); +],[ + AC_DEFINE(HAVE_LM_GRANT_2ARGS, 1, + [lock_manager_operations.lm_grant takes two args]) +]) +]) # LC_HAVE_LM_GRANT_2ARGS + +# # LC_NFS_FILLDIR_USE_CTX # # 3.18 kernel moved from void cookie to struct dir_context @@ -4255,6 +4272,8 @@ AC_DEFUN([LC_HAVE_LOCKS_LOCK_FILE_WAIT_IN_FILELOCK], [ [kernel has locks_lock_file_wait in filelock.h]) AC_DEFINE(HAVE_LINUX_FILELOCK_HEADER, 1, [linux/filelock.h is present]) + AC_DEFINE(HAVE_LM_GRANT_2ARGS, 1, + [lock_manager_operations.lm_grant takes two args]) ]) ]) # LC_HAVE_LOCKS_LOCK_FILE_WAIT_IN_FILELOCK @@ -5079,6 +5098,7 @@ AC_DEFUN([LC_PROG_LINUX_RESULTS], [ LC_HAVE_INTERVAL_BLK_INTEGRITY LC_KEY_MATCH_DATA LC_HAVE_BLK_INTEGRITY_ITER + LC_HAVE_LM_GRANT_2ARGS # 3.18 LC_PERCPU_COUNTER_INIT diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 0a962a5..cd4d6f0 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -30,6 +30,9 @@ #include #include #include +#ifdef HAVE_LINUX_FILELOCK_HEADER +#include +#endif #include "lustre_dlm_flags.h" @@ -1113,6 +1116,8 @@ struct ldlm_resource { * that are waiting for conflicts to go away */ struct list_head lr_waiting; + /* List of locks that waiting to enqueueing for flock */ + struct list_head lr_enqueueing; /** @} */ /** Resource name */ @@ -1287,6 +1292,27 @@ struct ldlm_enqueue_info { #define ei_res_id ei_cb_gl +enum ldlm_flock_flags { + FA_FL_CANCEL_RQST = 1, + FA_FL_CANCELED = 2, +}; + +struct ldlm_flock_info { + struct file *fa_file; + struct file_lock *fa_fl; /* original file_lock */ + struct file_lock fa_flc; /* lock copy */ + enum ldlm_flock_flags fa_flags; + enum ldlm_mode fa_mode; +#ifdef HAVE_LM_GRANT_2ARGS + int (*fa_notify)(struct file_lock *, int); +#else + int (*fa_notify)(struct file_lock *, struct file_lock *, int); +#endif + int fa_err; + int fa_ready; + wait_queue_head_t fa_waitq; +}; + extern char *ldlm_lockname[]; extern char *ldlm_typename[]; extern const char *ldlm_it2str(enum ldlm_intent_flags it); @@ -1421,6 +1447,9 @@ int ldlm_replay_locks(struct obd_import *imp); /* ldlm_flock.c */ int ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data); +struct ldlm_flock_info * +ldlm_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, + void *data); /* ldlm_extent.c */ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 345327e..1e60378 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1244,6 +1244,10 @@ struct md_ops { const union ldlm_policy_data *, struct md_op_data *, struct lustre_handle *, __u64); + int (*m_enqueue_async)(struct obd_export *, struct ldlm_enqueue_info *, + obd_enqueue_update_f, struct md_op_data *, + const union ldlm_policy_data *, __u64); + int (*m_getattr)(struct obd_export *, struct md_op_data *, struct ptlrpc_request **); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 35f486e..c92b930 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1393,6 +1393,7 @@ enum mps_stat_idx { LPROC_MD_CLOSE, LPROC_MD_CREATE, LPROC_MD_ENQUEUE, + LPROC_MD_ENQUEUE_ASYNC, LPROC_MD_GETATTR, LPROC_MD_INTENT_LOCK, LPROC_MD_LINK, @@ -1508,6 +1509,29 @@ static inline int md_enqueue(struct obd_export *exp, extra_lock_flags); } +static inline int md_enqueue_async(struct obd_export *exp, + struct ldlm_enqueue_info *einfo, + obd_enqueue_update_f upcall, + struct md_op_data *op_data, + const union ldlm_policy_data *policy, + __u64 lock_flags) +{ + int rc; + + ENTRY; + rc = exp_check_ops(exp); + if (rc) + RETURN(rc); + + lprocfs_counter_incr(exp->exp_obd->obd_md_stats, + LPROC_MD_ENQUEUE_ASYNC); + + rc = exp->exp_obd->obd_type->typ_md_ops->m_enqueue_async(exp, einfo, + upcall, op_data, + policy, lock_flags); + RETURN(rc); +} + static inline int md_getattr_name(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 5eee474..46cdda0 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -70,6 +70,15 @@ ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new) lock->l_policy_data.l_flock.start)); } +static int ldlm_flocks_are_equal(struct ldlm_lock *l1, struct ldlm_lock *l2) +{ + return ldlm_same_flock_owner(l1, l2) && + l1->l_policy_data.l_flock.start == + l2->l_policy_data.l_flock.start && + l1->l_policy_data.l_flock.end == + l2->l_policy_data.l_flock.end; +} + static inline void ldlm_flock_blocking_link(struct ldlm_lock *req, struct ldlm_lock *lock) { @@ -349,6 +358,46 @@ reprocess: if (end < OBD_OBJECT_EOF) end++; } + + if (*flags != LDLM_FL_WAIT_NOREPROC && mode == LCK_NL) { + /* This loop determines where this processes locks start + * in the resource lr_granted list. + */ +#ifdef HAVE_SERVER_SUPPORT + list_for_each_entry(lock, &res->lr_waiting, l_res_link) { + LASSERT(lock->l_req_mode != LCK_NL); + + if (ldlm_flocks_are_equal(req, lock)) { + /* To start cancel a waiting lock */ + LIST_HEAD(rpc_list); + + LDLM_DEBUG(lock, "server-side: cancel waiting"); + /* client receives cancelled lock as granted + * with l_granted_mode == 0 + */ + LASSERT(lock->l_granted_mode == LCK_MINMODE); + lock->l_flags |= LDLM_FL_AST_SENT; + ldlm_resource_unlink_lock(lock); + ldlm_add_ast_work_item(lock, NULL, &rpc_list); + LDLM_LOCK_GET(lock); + unlock_res_and_lock(req); + ldlm_run_ast_work(ns, &rpc_list, + LDLM_WORK_CP_AST); + ldlm_lock_cancel(lock); + LDLM_LOCK_RELEASE(lock); + lock_res_and_lock(req); + break; + } + } +#else /* !HAVE_SERVER_SUPPORT */ + /* The only one possible case for client-side calls flock + * policy function is ldlm_flock_completion_ast inside which + * carries LDLM_FL_WAIT_NOREPROC flag. + */ + CERROR("Illegal parameter for client-side-only module.\n"); + LBUG(); +#endif /* HAVE_SERVER_SUPPORT */ + } if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { /* This loop collects all overlapping locks with the * same owner. @@ -457,6 +506,12 @@ reprocess: for (lock = ownlocks; lock; lock = nextlock) { nextlock = lock->l_same_owner; + /* lock was granted by ldlm_lock_enqueue() + * but not processed yet + */ + if (*flags == LDLM_FL_WAIT_NOREPROC && lock->l_ast_data) + continue; + if (lock->l_granted_mode == mode) { /* * If the modes are the same then we need to process @@ -633,6 +688,76 @@ restart: RETURN(LDLM_ITER_CONTINUE); } +static void ldlm_flock_mark_canceled(struct ldlm_lock *lock) +{ + struct ldlm_flock_info *args; + struct ldlm_lock *waiting_lock = NULL; + struct ldlm_resource *res = lock->l_resource; + + ENTRY; + check_res_locked(res); + list_for_each_entry(waiting_lock, &res->lr_enqueueing, l_res_link) { + if (ldlm_flocks_are_equal(waiting_lock, lock)) { + LDLM_DEBUG(lock, "mark canceled enqueueing lock"); + args = waiting_lock->l_ast_data; + if (args) + args->fa_flags |= FA_FL_CANCELED; + RETURN_EXIT; + } + } + list_for_each_entry(waiting_lock, &res->lr_waiting, l_res_link) { + if (ldlm_flocks_are_equal(waiting_lock, lock)) { + LDLM_DEBUG(lock, "mark canceled waiting lock"); + args = waiting_lock->l_ast_data; + if (args) + args->fa_flags |= FA_FL_CANCELED; + RETURN_EXIT; + } + } + EXIT; +} + +static int ldlm_flock_completion_common(struct ldlm_lock *lock) +{ + struct ldlm_flock_info *args = lock->l_ast_data; + int rc = 0; + + /* Protect against race where lock could have been just destroyed + * due to overlap in ldlm_process_flock_lock(). + */ + if (lock->l_flags & LDLM_FL_DESTROYED) { + LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed"); + return -EIO; + } + + /* Import invalidation. We need to actually release the lock + * references being held, so that it can go away. No point in + * holding the lock even if app still believes it has it, since + * server already dropped it anyway. Only for granted locks too. + * Do the same for DEADLOCK'ed locks. + */ + if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) { + enum ldlm_mode mode = args ? + args->fa_mode : lock->l_granted_mode; + + /* args is NULL only for granted locks */ + LASSERT(args != NULL || + lock->l_req_mode == lock->l_granted_mode); + + if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) { + LDLM_DEBUG(lock, + "client-side enqueue deadlock received"); + rc = -EDEADLK; + } else { + LDLM_DEBUG(lock, "client-side lock cleanup"); + rc = -EIO; + } + ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC); + } + + return rc; +} + /** * Flock completion callback function. * @@ -642,11 +767,30 @@ restart: * * \retval 0 : success * \retval <0 : failure + * + * This funclion is called from: + * 1. ldlm_cli_enqueue_fini() + * a) grant a new lock or UNLOCK(l_granted_mode == LCK_NL) lock + * b) TEST lock, l_flags & LDLM_FL_TEST_LOCK; if can be granted + * server returns a conflicting lock, otherwise + * l_granted_mode == LCK_NL + * 2. ldlm_handle_cp_callback() + * a) grant a new lock + * b) cancel a DEADLOCK'ed lock, l_flags & LDLM_FL_FLOCK_DEADLOCK, + * l_granted_mode == 0 + * c) cancel async waiting lock (F_CANCELLK), l_flags & FA_FL_CANCELED, + * l_granted_mode == 0 + * 3. cleanup_resource() (called only for the forced umount case) + * a) a granted or waiting lock is to be destroyed, + * lock->l_flags & flags have LDLM_FL_FAILED. + * 4. races between the 3 above + * a) cleanup vs. reply or CP AST + * b) F_CANCELLK vs. CP AST granting a new lock */ int ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) { - struct file_lock *getlk = lock->l_ast_data; + struct ldlm_flock_info *args; struct obd_device *obd; enum ldlm_error err; int rc = 0; @@ -662,8 +806,8 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) unlock_res_and_lock(lock); CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4); } - CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n", - flags, data, getlk); + CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p l_ast_data: %p\n", + flags, data, lock->l_ast_data); LASSERT(flags != LDLM_FL_WAIT_NOREPROC); @@ -723,44 +867,9 @@ granted: lock_res_and_lock(lock); - - /* Protect against race where lock could have been just destroyed - * due to overlap in ldlm_process_flock_lock(). - */ - if (ldlm_is_destroyed(lock)) { - unlock_res_and_lock(lock); - LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed"); - - /* error is returned up to ldlm_cli_enqueue_fini() caller. */ - RETURN(-EIO); - } - - /* ldlm_lock_enqueue() has already placed lock on the granted list. */ - ldlm_resource_unlink_lock(lock); - - /* Import invalidation. We need to actually release the lock - * references being held, so that it can go away. No point in - * holding the lock even if app still believes it has it, since - * server already dropped it anyway. Only for granted locks too. - */ - /* Do the same for DEADLOCK'ed locks. */ - if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) { - int mode; - - if (flags & LDLM_FL_TEST_LOCK) - LASSERT(ldlm_is_test_lock(lock)); - - if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock)) - mode = getlk->C_FLC_TYPE; - else - mode = lock->l_req_mode; - - if (ldlm_is_flock_deadlock(lock)) { - LDLM_DEBUG(lock, - "client-side enqueue deadlock received"); - rc = -EDEADLK; - } - ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC); + rc = ldlm_flock_completion_common(lock); + if (rc) { + lock->l_ast_data = NULL; unlock_res_and_lock(lock); /* Need to wake up the waiter if we were evicted */ @@ -769,19 +878,33 @@ granted: /* An error is still to be returned, to propagate it up to * ldlm_cli_enqueue_fini() caller. */ - RETURN(rc ? : -EIO); + RETURN(rc); + } + + args = lock->l_ast_data; + + if (lock->l_granted_mode == LCK_MINMODE) { + ldlm_flock_destroy(lock, args->fa_mode, LDLM_FL_WAIT_NOREPROC); + lock->l_ast_data = NULL; + unlock_res_and_lock(lock); + CERROR("%s: client-side: only asynchronous lock enqueue can be canceled by CANCELK\n", + lock->l_export->exp_obd->obd_name); + RETURN(-EIO); + } + + if (args->fa_flags & FA_FL_CANCEL_RQST) { + LDLM_DEBUG(lock, "client-side granted CANCELK lock"); + ldlm_flock_mark_canceled(lock); } LDLM_DEBUG(lock, "client-side enqueue granted"); if (flags & LDLM_FL_TEST_LOCK) { - /* - * fcntl(F_GETLK) request - * The old mode was saved in getlk->C_FLC_TYPE so that if the mode - * in the lock changes we can decref the appropriate refcount. - */ + struct file_lock *getlk = args->fa_fl; + /* fcntl(F_GETLK) request */ LASSERT(ldlm_is_test_lock(lock)); - ldlm_flock_destroy(lock, getlk->C_FLC_TYPE, LDLM_FL_WAIT_NOREPROC); + ldlm_flock_destroy(lock, args->fa_mode, LDLM_FL_WAIT_NOREPROC); + switch (lock->l_granted_mode) { case LCK_PR: getlk->C_FLC_TYPE = F_RDLCK; @@ -798,16 +921,96 @@ granted: } else { __u64 noreproc = LDLM_FL_WAIT_NOREPROC; + /* ldlm_lock_enqueue() has already placed lock on the granted + * list. + */ + ldlm_resource_unlink_lock(lock); + /* We need to reprocess the lock to do merges or splits * with existing locks owned by this process. */ ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL); } + lock->l_ast_data = NULL; unlock_res_and_lock(lock); RETURN(rc); } EXPORT_SYMBOL(ldlm_flock_completion_ast); +/* This function is called in same cases as ldlm_flock_completion_ast() + * except UNLOCK, TEST lock, F_CANCELLK which are using only + * synchronous mechanism + */ +struct ldlm_flock_info * +ldlm_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data) +{ + __u64 noreproc = LDLM_FL_WAIT_NOREPROC; + enum ldlm_error err; + int rc; + struct ldlm_flock_info *args; + + ENTRY; + LDLM_DEBUG(lock, "flags: 0x%llx data: %p l_ast_data: %p", + flags, data, lock->l_ast_data); + + LASSERT(flags != LDLM_FL_WAIT_NOREPROC); + + lock_res_and_lock(lock); + + args = lock->l_ast_data; + rc = ldlm_flock_completion_common(lock); + if (rc != 0) + GOTO(out, rc); + + if (lock->l_granted_mode != LCK_NL) { + if (args == NULL) { + LDLM_DEBUG(lock, + "client-side lock is already granted in a race"); + LASSERT(lock->l_granted_mode == lock->l_req_mode); + LASSERT(lock->l_granted_mode != LCK_MINMODE); + GOTO(out, rc = 0); + } + + if (args->fa_flags & FA_FL_CANCELED || + ((flags & LDLM_FL_BLOCKED_MASK) == 0 && + lock->l_granted_mode == LCK_MINMODE)) { + LDLM_DEBUG(lock, "client-side granted canceled lock"); + ldlm_flock_destroy(lock, args->fa_mode, + LDLM_FL_WAIT_NOREPROC); + GOTO(out, rc = -EIO); + } + } + + if (flags & LDLM_FL_BLOCKED_MASK) { + LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock"); + args = NULL; + GOTO(out, rc = 0); + } + + if (data != NULL) + LDLM_DEBUG(lock, "client-side granted a blocked lock"); + else + LDLM_DEBUG(lock, "client-side lock granted"); + + /* ldlm_lock_enqueue() has already placed lock on the granted list. */ + ldlm_resource_unlink_lock(lock); + + /* We need to reprocess the lock to do merges or splits + * with existing locks owned by this process. + */ + ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL); + +out: + if (args != NULL) { + lock->l_ast_data = NULL; + args->fa_err = rc; + } + unlock_res_and_lock(lock); + + RETURN(args); +} +EXPORT_SYMBOL(ldlm_flock_completion_ast_async); + int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 4c4f09a..d6bcd6b 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -945,6 +945,15 @@ struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len) } EXPORT_SYMBOL(ldlm_enqueue_pack); +static void ldlm_lock_add_to_enqueueing(struct ldlm_lock *lock) +{ + struct ldlm_resource *res = lock->l_resource; + + lock_res(res); + ldlm_resource_add_lock(res, &res->lr_enqueueing, lock); + unlock_res(res); +} + /** * Client-side lock enqueue. * @@ -1013,6 +1022,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LBUG(); lock->l_req_extent = policy->l_extent; + } else if (einfo->ei_type == LDLM_FLOCK) { + ldlm_lock_add_to_enqueueing(lock); } LDLM_DEBUG(lock, "client-side enqueue START, flags %#llx", *flags); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 167ae0d..2c453f9 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -1477,6 +1477,7 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type) INIT_LIST_HEAD(&res->lr_granted); INIT_LIST_HEAD(&res->lr_waiting); + INIT_LIST_HEAD(&res->lr_enqueueing); refcount_set(&res->lr_refcount, 1); spin_lock_init(&res->lr_lock); @@ -1617,6 +1618,11 @@ static void __ldlm_resource_putref_final(struct cfs_hash_bd *bd, LBUG(); } + if (!list_empty(&res->lr_enqueueing)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + cfs_hash_bd_del_locked(nsb->nsb_namespace->ns_rs_hash, bd, &res->lr_hash); if (atomic_dec_and_test(&nsb->nsb_count)) @@ -1673,6 +1679,8 @@ static void __ldlm_resource_add_lock(struct ldlm_resource *res, if (res->lr_type == LDLM_IBITS) ldlm_inodebits_add_lock(res, head, lock, tail); + else if (res->lr_type == LDLM_FLOCK) + LASSERT(lock->l_req_mode != LCK_NL || head != &res->lr_waiting); ldlm_resource_dump(D_INFO, res); } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 4d364de..0dc7d07 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -5244,44 +5244,25 @@ int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync) RETURN(rc); } -static int -ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) +static int ll_file_flc2policy(struct file_lock *file_lock, int cmd, + union ldlm_policy_data *flock) { - struct inode *inode = file_inode(file); - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ldlm_enqueue_info einfo = { - .ei_type = LDLM_FLOCK, - .ei_cb_cp = ldlm_flock_completion_ast, - .ei_cbdata = file_lock, - }; - struct md_op_data *op_data; - struct lustre_handle lockh = { 0 }; - union ldlm_policy_data flock = { { 0 } }; - struct file_lock flbuf = *file_lock; - int fl_type = file_lock->C_FLC_TYPE; - ktime_t kstart = ktime_get(); - __u64 flags = 0; - int rc; - int rc2 = 0; - ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n", - PFID(ll_inode2fid(inode)), file_lock); if (file_lock->C_FLC_FLAGS & FL_FLOCK) { LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK)); /* flocks are whole-file locks */ - flock.l_flock.end = OFFSET_MAX; + flock->l_flock.end = OFFSET_MAX; /* For flocks owner is determined by the local file desctiptor*/ - flock.l_flock.owner = (unsigned long)file_lock->C_FLC_FILE; + flock->l_flock.owner = (unsigned long)file_lock->C_FLC_FILE; } else if (file_lock->C_FLC_FLAGS & FL_POSIX) { - flock.l_flock.owner = (unsigned long)file_lock->C_FLC_OWNER; - flock.l_flock.start = file_lock->fl_start; - flock.l_flock.end = file_lock->fl_end; + flock->l_flock.owner = (unsigned long)file_lock->C_FLC_OWNER; + flock->l_flock.start = file_lock->fl_start; + flock->l_flock.end = file_lock->fl_end; } else { RETURN(-EINVAL); } - flock.l_flock.pid = file_lock->C_FLC_PID; + flock->l_flock.pid = file_lock->C_FLC_PID; #if defined(HAVE_LM_COMPARE_OWNER) || defined(lm_compare_owner) /* Somewhat ugly workaround for svc lockd. @@ -5293,8 +5274,208 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) * pointer space for current->files are not intersecting */ if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner) - flock.l_flock.owner = (unsigned long)file_lock->C_FLC_PID; + flock->l_flock.owner = (unsigned long)file_lock->C_FLC_PID; +#endif + + RETURN(0); +} + +static int ll_file_flock_lock(struct file *file, struct file_lock *file_lock) +{ + int rc = -EINVAL; + + /* We don't need to sleep on conflicting locks. + * It is called in following usecases : + * 1. adding new lock - no conflicts exist as it is already granted + * on the server. + * 2. unlock - never conflicts with anything. + */ + file_lock->fl_flags &= ~FL_SLEEP; +#ifdef HAVE_LOCKS_LOCK_FILE_WAIT + rc = locks_lock_file_wait(file, file_lock); +#else + if (file_lock->fl_flags & FL_FLOCK) { + rc = flock_lock_file_wait(file, file_lock); + } else if (file_lock->fl_flags & FL_POSIX) { + rc = posix_lock_file(file, file_lock, NULL); + } +#endif /* HAVE_LOCKS_LOCK_FILE_WAIT */ + if (rc) + CDEBUG_LIMIT(rc == -ENOENT ? D_DLMTRACE : D_ERROR, + "kernel lock failed: rc = %d\n", rc); + + return rc; +} + +static int ll_flock_upcall(void *cookie, int err); +static int +ll_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data); + +static int ll_file_flock_async_unlock(struct inode *inode, + struct file_lock *file_lock) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK, + .ei_cb_cp = + ll_flock_completion_ast_async, + .ei_mode = LCK_NL, + .ei_cbdata = NULL }; + union ldlm_policy_data flock = { {0} }; + struct md_op_data *op_data; + int rc; + + ENTRY; + rc = ll_file_flc2policy(file_lock, F_SETLK, &flock); + if (rc) + RETURN(rc); + + op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, + LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + + rc = md_enqueue_async(sbi->ll_md_exp, &einfo, ll_flock_upcall, + op_data, &flock, 0); + + ll_finish_md_op_data(op_data); + + RETURN(rc); +} + +/* This function is called only once after ldlm callback. Args are already + * detached from lock. So, locking isn't needed. + * It should only report lock status to kernel. + */ +static void ll_file_flock_async_cb(struct ldlm_flock_info *args) +{ + struct file_lock *file_lock = args->fa_fl; + struct file_lock *flc = &args->fa_flc; + struct file *file = args->fa_file; + struct inode *inode = file->f_path.dentry->d_inode; + int err = args->fa_err; + int rc; + + ENTRY; + CDEBUG(D_INFO, "err=%d file_lock=%p file=%p start=%llu end=%llu\n", + err, file_lock, file, flc->fl_start, flc->fl_end); + + /* The kernel is responsible for resolving grant vs F_CANCELK or + * grant vs. cleanup races, it may happen that CANCELED flag + * isn't set and err == 0, because f_CANCELK/cleanup happens between + * ldlm_flock_completion_ast_async() and ll_flock_run_flock_cb(). + * In this case notify() returns error for already canceled flock. + */ + if (!(args->fa_flags & FA_FL_CANCELED)) { + struct file_lock notify_lock; + + locks_init_lock(¬ify_lock); + locks_copy_lock(¬ify_lock, flc); + + if (err == 0) + ll_file_flock_lock(file, flc); + + wait_event_idle(args->fa_waitq, args->fa_ready); + +#ifdef HAVE_LM_GRANT_2ARGS + rc = args->fa_notify(¬ify_lock, err); +#else + rc = args->fa_notify(¬ify_lock, NULL, err); #endif + if (rc) { + CDEBUG_LIMIT(D_ERROR, + "notify failed file_lock=%p err=%d\n", + file_lock, err); + if (err == 0) { + flc->C_FLC_TYPE = F_UNLCK; + ll_file_flock_lock(file, flc); + ll_file_flock_async_unlock(inode, flc); + } + } + } + + fput(file); + + EXIT; +} + +static void ll_flock_run_flock_cb(struct ldlm_flock_info *args) +{ + if (args) { + ll_file_flock_async_cb(args); + OBD_FREE_PTR(args); + } +} + +static int ll_flock_upcall(void *cookie, int err) +{ + struct ldlm_flock_info *args; + struct ldlm_lock *lock = cookie; + + if (err != 0) { + CERROR("ldlm_cli_enqueue_fini lock=%p : rc = %d\n", lock, err); + + lock_res_and_lock(lock); + args = lock->l_ast_data; + lock->l_ast_data = NULL; + unlock_res_and_lock(lock); + + if (args) + args->fa_err = err; + ll_flock_run_flock_cb(args); + } + + return 0; +} + +static int +ll_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data) +{ + struct ldlm_flock_info *args; + + ENTRY; + + args = ldlm_flock_completion_ast_async(lock, flags, data); + if (args && args->fa_flags & FA_FL_CANCELED) { + /* lock was cancelled in a race */ + struct inode *inode = args->fa_file->f_path.dentry->d_inode; + + ll_file_flock_async_unlock(inode, &args->fa_flc); + } + + ll_flock_run_flock_cb(args); + + RETURN(0); +} + +static int +ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) +{ + struct inode *inode = file_inode(file); + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ldlm_enqueue_info einfo = { + .ei_type = LDLM_FLOCK, + .ei_cb_cp = ldlm_flock_completion_ast, + .ei_cbdata = NULL, + }; + struct md_op_data *op_data; + struct lustre_handle lockh = { 0 }; + union ldlm_policy_data flock = { { 0 } }; + struct file_lock flbuf = *file_lock; + int fl_type = file_lock->C_FLC_TYPE; + ktime_t kstart = ktime_get(); + __u64 flags = 0; + struct ldlm_flock_info *cb_data = NULL; + int rc; + + ENTRY; + CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n", + PFID(ll_inode2fid(inode)), file_lock); + + ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1); + + rc = ll_file_flc2policy(file_lock, cmd, &flock); + if (rc) + RETURN(rc); switch (fl_type) { case F_RDLCK: @@ -5346,6 +5527,13 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) */ posix_test_lock(file, &flbuf); break; + case F_CANCELLK: + CDEBUG(D_DLMTRACE, "F_CANCELLK owner=%llx %llu-%llu\n", + flock.l_flock.owner, flock.l_flock.start, + flock.l_flock.end); + file_lock->C_FLC_TYPE = F_UNLCK; + einfo.ei_mode = LCK_NL; + break; default: rc = -EINVAL; CERROR("%s: fcntl from '%s' unknown lock command=%d: rc = %d\n", @@ -5353,51 +5541,102 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) RETURN(rc); } - /* Save the old mode so that if the mode in the lock changes we - * can decrement the appropriate reader or writer refcount. - */ - file_lock->C_FLC_TYPE = einfo.ei_mode; + CDEBUG(D_DLMTRACE, + "inode="DFID", pid=%u, owner=%#llx, flags=%#llx, mode=%u, start=%llu, end=%llu\n", + PFID(ll_inode2fid(inode)), flock.l_flock.pid, + flock.l_flock.owner, flags, einfo.ei_mode, + flock.l_flock.start, flock.l_flock.end); op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - CDEBUG(D_DLMTRACE, - "inode="DFID", pid=%u, flags=%#llx, mode=%u, start=%llu, end=%llu\n", - PFID(ll_inode2fid(inode)), - flock.l_flock.pid, flags, einfo.ei_mode, - flock.l_flock.start, flock.l_flock.end); + OBD_ALLOC_PTR(cb_data); + if (!cb_data) + GOTO(out, rc = -ENOMEM); - rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, &lockh, - flags); + cb_data->fa_file = file; + cb_data->fa_fl = file_lock; + cb_data->fa_mode = einfo.ei_mode; + init_waitqueue_head(&cb_data->fa_waitq); + locks_init_lock(&cb_data->fa_flc); + locks_copy_lock(&cb_data->fa_flc, file_lock); + if (cmd == F_CANCELLK) + cb_data->fa_flags |= FA_FL_CANCEL_RQST; + einfo.ei_cbdata = cb_data; + + if (file_lock->fl_lmops && file_lock->fl_lmops->lm_grant && + file_lock->C_FLC_TYPE != F_UNLCK && + flags == LDLM_FL_BLOCK_NOWAIT /* F_SETLK/F_SETLK64 */) { + + cb_data->fa_notify = file_lock->fl_lmops->lm_grant; + flags = (file_lock->fl_flags & FL_SLEEP) ? + 0 : LDLM_FL_BLOCK_NOWAIT; + einfo.ei_cb_cp = ll_flock_completion_ast_async; + get_file(file); + + rc = md_enqueue_async(sbi->ll_md_exp, &einfo, + ll_flock_upcall, op_data, &flock, flags); + if (rc) { + fput(file); + OBD_FREE_PTR(cb_data); + cb_data = NULL; + } else { + rc = FILE_LOCK_DEFERRED; + } + } else { + if (file_lock->C_FLC_TYPE == F_UNLCK && + flags != LDLM_FL_TEST_LOCK) { + /* We unlock kernel lock before ldlm one to avoid race + * with reordering of unlock & lock responses from + * server. + */ + cb_data->fa_flc.fl_flags |= FL_EXISTS; + rc = ll_file_flock_lock(file, &cb_data->fa_flc); + if (rc) { + if (rc == -ENOENT) { + if (!(file_lock->C_FLC_TYPE & + FL_EXISTS)) + rc = 0; + } else { + CDEBUG_LIMIT(D_ERROR, + "local unlock failed rc=%d\n", + rc); + } + OBD_FREE_PTR(cb_data); + cb_data = NULL; + GOTO(out, rc); + } + } - /* Restore the file lock type if not TEST lock. */ - if (!(flags & LDLM_FL_TEST_LOCK)) - file_lock->C_FLC_TYPE = fl_type; + rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, + &lockh, flags); -#ifdef HAVE_LOCKS_LOCK_FILE_WAIT - if ((rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK) && - !(flags & LDLM_FL_TEST_LOCK)) - rc2 = locks_lock_file_wait(file, file_lock); -#else - if ((file_lock->C_FLC_FLAGS & FL_FLOCK) && - (rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK)) - rc2 = flock_lock_file_wait(file, file_lock); - if ((file_lock->C_FLC_FLAGS & FL_POSIX) && - (rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK) && - !(flags & LDLM_FL_TEST_LOCK)) - rc2 = posix_lock_file_wait(file, file_lock); -#endif /* HAVE_LOCKS_LOCK_FILE_WAIT */ - if (rc2 && file_lock->C_FLC_TYPE != F_UNLCK) { - einfo.ei_mode = LCK_NL; - md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, - &lockh, flags); - rc = rc2; - } + if (!rc && file_lock->C_FLC_TYPE != F_UNLCK && + !(flags & LDLM_FL_TEST_LOCK)) { + int rc2; + rc2 = ll_file_flock_lock(file, file_lock); + + if (rc2) { + einfo.ei_mode = LCK_NL; + cb_data->fa_mode = einfo.ei_mode; + md_enqueue(sbi->ll_md_exp, &einfo, &flock, + op_data, &lockh, flags); + rc = rc2; + } + } + OBD_FREE_PTR(cb_data); + cb_data = NULL; + } +out: ll_finish_md_op_data(op_data); + if (cb_data) { + cb_data->fa_ready = 1; + wake_up(&cb_data->fa_waitq); + } if (rc == 0 && (flags & LDLM_FL_TEST_LOCK) && flbuf.C_FLC_TYPE != file_lock->C_FLC_TYPE) { /* Verify local & remote */ diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 03b257b..c5623e7 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -2318,6 +2318,34 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, RETURN(rc); } +static int +lmv_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + obd_enqueue_update_f upcall, struct md_op_data *op_data, + const union ldlm_policy_data *policy, __u64 flags) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + + ENTRY; + + CDEBUG(D_INODE, "ENQUEUE ASYNC on "DFID"\n", + PFID(&op_data->op_fid1)); + + tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + CDEBUG(D_INODE, "ENQUEUE ASYNC on "DFID" -> mds #%d\n", + PFID(&op_data->op_fid1), tgt->ltd_index); + + rc = md_enqueue_async(tgt->ltd_exp, einfo, upcall, op_data, policy, + flags); + + RETURN(rc); +} + int lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **preq) @@ -4463,6 +4491,7 @@ static const struct md_ops lmv_md_ops = { .m_close = lmv_close, .m_create = lmv_create, .m_enqueue = lmv_enqueue, + .m_enqueue_async = lmv_enqueue_async, .m_getattr = lmv_getattr, .m_getxattr = lmv_getxattr, .m_getattr_name = lmv_getattr_name, diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index 197f864..d07e7e8 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -72,6 +72,11 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, const union ldlm_policy_data *policy, struct md_op_data *op_data, struct lustre_handle *lockh, __u64 extra_lock_flags); + +int mdc_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + obd_enqueue_update_f upcall, struct md_op_data *op_data, + const union ldlm_policy_data *policy, __u64 lock_flags); + int mdc_resource_get_unused_res(struct obd_export *exp, struct ldlm_res_id *res_id, struct list_head *cancels, diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 7b44133..a772865 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -33,6 +33,14 @@ struct mdc_getattr_args { struct md_op_item *ga_item; }; +struct mdc_enqueue_args { + struct ldlm_lock *mea_lock; + struct obd_export *mea_exp; + enum ldlm_mode mea_mode; + __u64 mea_flags; + obd_enqueue_update_f mea_upcall; +}; + int it_open_error(int phase, struct lookup_intent *it) { if (it_disposition(it, DISP_OPEN_LEASE)) { @@ -1180,6 +1188,86 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, op_data, lockh, extra_lock_flags); } +static int mdc_enqueue_async_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + void *args, int rc) +{ + struct mdc_enqueue_args *mea = args; + struct obd_export *exp = mea->mea_exp; + struct ldlm_lock *lock = mea->mea_lock; + struct lustre_handle lockh; + struct ldlm_enqueue_info einfo = { + .ei_type = LDLM_FLOCK, + .ei_mode = mea->mea_mode, + }; + + ENTRY; + CDEBUG(D_INFO, "req=%p rc=%d\n", req, rc); + + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_enqueue_fini(exp, &req->rq_pill, &einfo, 1, + &mea->mea_flags, NULL, 0, &lockh, rc, true); + if (rc == -ENOLCK) + LDLM_LOCK_RELEASE(lock); + + /* we expect failed_lock_cleanup() to destroy lock */ + if (rc != 0) + LASSERT(list_empty(&lock->l_res_link)); + + if (mea->mea_upcall != NULL) + mea->mea_upcall(lock, rc); + + LDLM_LOCK_PUT(lock); + + RETURN(rc); +} + +int mdc_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + obd_enqueue_update_f upcall, struct md_op_data *op_data, + const union ldlm_policy_data *policy, __u64 flags) +{ + struct mdc_enqueue_args *mea; + struct ptlrpc_request *req; + int rc; + struct ldlm_res_id res_id; + struct lustre_handle lockh; + + ENTRY; + fid_build_reg_res_name(&op_data->op_fid1, &res_id); + + LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", + einfo->ei_type); + res_id.name[3] = LDLM_FLOCK; + + req = ldlm_enqueue_pack(exp, 0); + if (IS_ERR(req)) + RETURN(PTR_ERR(req)); + + einfo->ei_req_slot = 1; + einfo->ei_mod_slot = 1; + + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, + 0, 0, &lockh, 1); + if (rc) { + ptlrpc_req_put(req); + RETURN(rc); + } + + mea = ptlrpc_req_async_args(mea, req); + mea->mea_exp = exp; + mea->mea_lock = ldlm_handle2lock(&lockh); + LASSERT(mea->mea_lock != NULL); + + mea->mea_mode = einfo->ei_mode; + mea->mea_flags = flags; + mea->mea_upcall = upcall; + + req->rq_interpret_reply = mdc_enqueue_async_interpret; + ptlrpcd_add_req(req); + + RETURN(0); +} + static int mdc_finish_intent_lock(struct obd_export *exp, struct ptlrpc_request *request, struct md_op_data *op_data, diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 6eee865..05ee41e 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -3103,6 +3103,7 @@ static const struct md_ops mdc_md_ops = { .m_close = mdc_close, .m_create = mdc_create, .m_enqueue = mdc_enqueue, + .m_enqueue_async = mdc_enqueue_async, .m_getattr = mdc_getattr, .m_getattr_name = mdc_getattr_name, .m_intent_lock = mdc_intent_lock, diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index a11e826..9f35021 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -1614,6 +1614,7 @@ static const char * const mps_stats[] = { [LPROC_MD_CLOSE] = "close", [LPROC_MD_CREATE] = "create", [LPROC_MD_ENQUEUE] = "enqueue", + [LPROC_MD_ENQUEUE_ASYNC] = "enqueue_async", [LPROC_MD_GETATTR] = "getattr", [LPROC_MD_INTENT_LOCK] = "intent_lock", [LPROC_MD_LINK] = "link", -- 1.8.3.1