Whamcloud - gitweb
LU-2525 ldlm: add asynchronous flocks 89/4889/79
authorAndriy Skulysh <c17819@cray.com>
Tue, 5 Feb 2019 13:37:48 +0000 (15:37 +0200)
committerOleg Drokin <green@whamcloud.com>
Sun, 24 Nov 2024 06:08:33 +0000 (06:08 +0000)
Add support of asynchronous flocks.
They are used only by Linux nfsd for now.

HPE-bug-id: LUS-3210, LUS-7034,LUS-7031,LUS-8832, LUS-8313
HPE-bug-id: LUS-8592
Change-Id: Iefafaf014fd06d569dc5d1dd22ebb3518d04e99a
Reviewed-by: Vitaly Fertman <c17818@cray.com>
Reviewed-by: Alexander Boyko <c17825@cray.com>
Signed-off-by: Andriy Skulysh <andriy.skulysh@hpe.com>
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/4889
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Alexander Boyko <alexander.boyko@hpe.com>
Reviewed-by: Alexey Lyashkov <alexey.lyashkov@hpe.com>
Reviewed-by: Vitaly Fertman <vitaly.fertman@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
13 files changed:
lustre/autoconf/lustre-core.m4
lustre/include/lustre_dlm.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/llite/file.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/obdclass/lprocfs_status.c

index d955dc2..794dfba 100644 (file)
@@ -1146,6 +1146,23 @@ AC_DEFUN([LC_HAVE_BLK_INTEGRITY_ITER], [
 ]) # LC_HAVE_BLK_INTEGRITY_ITER
 
 #
+# LC_HAVE_LM_GRANT_2ARGS
+#
+# 3.17 removed unused argument from lm_grant
+#
+AC_DEFUN([LC_HAVE_LM_GRANT_2ARGS], [
+LB_CHECK_COMPILE([if 'lock_manager_operations.lm_grant' takes two args],
+lm_grant, [
+       #include <linux/fs.h>
+],[
+       ((struct lock_manager_operations *)NULL)->lm_grant(NULL, 0);
+],[
+       AC_DEFINE(HAVE_LM_GRANT_2ARGS, 1,
+               [lock_manager_operations.lm_grant takes two args])
+])
+]) # LC_HAVE_LM_GRANT_2ARGS
+
+#
 # LC_NFS_FILLDIR_USE_CTX
 #
 # 3.18 kernel moved from void cookie to struct dir_context
@@ -4255,6 +4272,8 @@ AC_DEFUN([LC_HAVE_LOCKS_LOCK_FILE_WAIT_IN_FILELOCK], [
                        [kernel has locks_lock_file_wait in filelock.h])
                AC_DEFINE(HAVE_LINUX_FILELOCK_HEADER, 1,
                        [linux/filelock.h is present])
+               AC_DEFINE(HAVE_LM_GRANT_2ARGS, 1,
+                       [lock_manager_operations.lm_grant takes two args])
        ])
 ]) # LC_HAVE_LOCKS_LOCK_FILE_WAIT_IN_FILELOCK
 
@@ -5079,6 +5098,7 @@ AC_DEFUN([LC_PROG_LINUX_RESULTS], [
        LC_HAVE_INTERVAL_BLK_INTEGRITY
        LC_KEY_MATCH_DATA
        LC_HAVE_BLK_INTEGRITY_ITER
+       LC_HAVE_LM_GRANT_2ARGS
 
        # 3.18
        LC_PERCPU_COUNTER_INIT
index 0a962a5..cd4d6f0 100644 (file)
@@ -30,6 +30,9 @@
 #include <lustre_import.h>
 #include <lustre_handles.h>
 #include <linux/interval_tree_generic.h>
+#ifdef HAVE_LINUX_FILELOCK_HEADER
+#include <linux/filelock.h>
+#endif
 
 #include "lustre_dlm_flags.h"
 
@@ -1113,6 +1116,8 @@ struct ldlm_resource {
         * that are waiting for conflicts to go away
         */
        struct list_head        lr_waiting;
+       /* List of locks that waiting to enqueueing for flock */
+       struct list_head        lr_enqueueing;
        /** @} */
 
        /** Resource name */
@@ -1287,6 +1292,27 @@ struct ldlm_enqueue_info {
 
 #define ei_res_id      ei_cb_gl
 
+enum ldlm_flock_flags {
+       FA_FL_CANCEL_RQST       = 1,
+       FA_FL_CANCELED          = 2,
+};
+
+struct ldlm_flock_info {
+       struct file             *fa_file;
+       struct file_lock        *fa_fl; /* original file_lock */
+       struct file_lock        fa_flc; /* lock copy */
+       enum ldlm_flock_flags   fa_flags;
+       enum ldlm_mode          fa_mode;
+#ifdef HAVE_LM_GRANT_2ARGS
+       int (*fa_notify)(struct file_lock *, int);
+#else
+       int (*fa_notify)(struct file_lock *, struct file_lock *, int);
+#endif
+       int                     fa_err;
+       int                     fa_ready;
+       wait_queue_head_t       fa_waitq;
+};
+
 extern char *ldlm_lockname[];
 extern char *ldlm_typename[];
 extern const char *ldlm_it2str(enum ldlm_intent_flags it);
@@ -1421,6 +1447,9 @@ int ldlm_replay_locks(struct obd_import *imp);
 
 /* ldlm_flock.c */
 int ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data);
+struct ldlm_flock_info *
+ldlm_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags,
+                               void *data);
 
 /* ldlm_extent.c */
 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms);
index 345327e..1e60378 100644 (file)
@@ -1244,6 +1244,10 @@ struct md_ops {
                         const union ldlm_policy_data *, struct md_op_data *,
                         struct lustre_handle *, __u64);
 
+       int (*m_enqueue_async)(struct obd_export *, struct ldlm_enqueue_info *,
+                              obd_enqueue_update_f, struct md_op_data *,
+                              const union ldlm_policy_data *, __u64);
+
        int (*m_getattr)(struct obd_export *, struct md_op_data *,
                         struct ptlrpc_request **);
 
index 35f486e..c92b930 100644 (file)
@@ -1393,6 +1393,7 @@ enum mps_stat_idx {
        LPROC_MD_CLOSE,
        LPROC_MD_CREATE,
        LPROC_MD_ENQUEUE,
+       LPROC_MD_ENQUEUE_ASYNC,
        LPROC_MD_GETATTR,
        LPROC_MD_INTENT_LOCK,
        LPROC_MD_LINK,
@@ -1508,6 +1509,29 @@ static inline int md_enqueue(struct obd_export *exp,
                                                             extra_lock_flags);
 }
 
+static inline int md_enqueue_async(struct obd_export *exp,
+                                  struct ldlm_enqueue_info *einfo,
+                                  obd_enqueue_update_f upcall,
+                                  struct md_op_data *op_data,
+                                  const union ldlm_policy_data *policy,
+                                  __u64 lock_flags)
+{
+       int rc;
+
+       ENTRY;
+       rc = exp_check_ops(exp);
+       if (rc)
+               RETURN(rc);
+
+       lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
+                            LPROC_MD_ENQUEUE_ASYNC);
+
+       rc = exp->exp_obd->obd_type->typ_md_ops->m_enqueue_async(exp, einfo,
+                       upcall, op_data,
+                       policy, lock_flags);
+       RETURN(rc);
+}
+
 static inline int md_getattr_name(struct obd_export *exp,
                                  struct md_op_data *op_data,
                                  struct ptlrpc_request **request)
index 5eee474..46cdda0 100644 (file)
@@ -70,6 +70,15 @@ ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
                 lock->l_policy_data.l_flock.start));
 }
 
+static int ldlm_flocks_are_equal(struct ldlm_lock *l1, struct ldlm_lock *l2)
+{
+       return ldlm_same_flock_owner(l1, l2) &&
+              l1->l_policy_data.l_flock.start ==
+              l2->l_policy_data.l_flock.start &&
+              l1->l_policy_data.l_flock.end ==
+              l2->l_policy_data.l_flock.end;
+}
+
 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
                                            struct ldlm_lock *lock)
 {
@@ -349,6 +358,46 @@ reprocess:
                if (end < OBD_OBJECT_EOF)
                        end++;
        }
+
+       if (*flags != LDLM_FL_WAIT_NOREPROC && mode == LCK_NL) {
+               /* This loop determines where this processes locks start
+                * in the resource lr_granted list.
+                */
+#ifdef HAVE_SERVER_SUPPORT
+               list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
+                       LASSERT(lock->l_req_mode != LCK_NL);
+
+                       if (ldlm_flocks_are_equal(req, lock)) {
+                               /* To start cancel a waiting lock */
+                               LIST_HEAD(rpc_list);
+
+                               LDLM_DEBUG(lock, "server-side: cancel waiting");
+                               /* client receives cancelled lock as granted
+                                * with l_granted_mode == 0
+                                */
+                               LASSERT(lock->l_granted_mode == LCK_MINMODE);
+                               lock->l_flags |= LDLM_FL_AST_SENT;
+                               ldlm_resource_unlink_lock(lock);
+                               ldlm_add_ast_work_item(lock, NULL, &rpc_list);
+                               LDLM_LOCK_GET(lock);
+                               unlock_res_and_lock(req);
+                               ldlm_run_ast_work(ns, &rpc_list,
+                                                 LDLM_WORK_CP_AST);
+                               ldlm_lock_cancel(lock);
+                               LDLM_LOCK_RELEASE(lock);
+                               lock_res_and_lock(req);
+                               break;
+                       }
+               }
+#else /* !HAVE_SERVER_SUPPORT */
+               /* The only one possible case for client-side calls flock
+                * policy function is ldlm_flock_completion_ast inside which
+                * carries LDLM_FL_WAIT_NOREPROC flag.
+                */
+               CERROR("Illegal parameter for client-side-only module.\n");
+               LBUG();
+#endif /* HAVE_SERVER_SUPPORT */
+       }
        if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
                /* This loop collects all overlapping locks with the
                 * same owner.
@@ -457,6 +506,12 @@ reprocess:
        for (lock = ownlocks; lock; lock = nextlock) {
                nextlock = lock->l_same_owner;
 
+               /* lock was granted by ldlm_lock_enqueue()
+                * but not processed yet
+                */
+               if (*flags == LDLM_FL_WAIT_NOREPROC && lock->l_ast_data)
+                       continue;
+
                if (lock->l_granted_mode == mode) {
                        /*
                         * If the modes are the same then we need to process
@@ -633,6 +688,76 @@ restart:
        RETURN(LDLM_ITER_CONTINUE);
 }
 
+static void ldlm_flock_mark_canceled(struct ldlm_lock *lock)
+{
+       struct ldlm_flock_info *args;
+       struct ldlm_lock *waiting_lock = NULL;
+       struct ldlm_resource *res = lock->l_resource;
+
+       ENTRY;
+       check_res_locked(res);
+       list_for_each_entry(waiting_lock, &res->lr_enqueueing, l_res_link) {
+               if (ldlm_flocks_are_equal(waiting_lock, lock)) {
+                       LDLM_DEBUG(lock, "mark canceled enqueueing lock");
+                       args = waiting_lock->l_ast_data;
+                       if (args)
+                               args->fa_flags |= FA_FL_CANCELED;
+                       RETURN_EXIT;
+               }
+       }
+       list_for_each_entry(waiting_lock, &res->lr_waiting, l_res_link) {
+               if (ldlm_flocks_are_equal(waiting_lock, lock)) {
+                       LDLM_DEBUG(lock, "mark canceled waiting lock");
+                       args = waiting_lock->l_ast_data;
+                       if (args)
+                               args->fa_flags |= FA_FL_CANCELED;
+                       RETURN_EXIT;
+               }
+       }
+       EXIT;
+}
+
+static int ldlm_flock_completion_common(struct ldlm_lock *lock)
+{
+       struct ldlm_flock_info *args = lock->l_ast_data;
+       int rc = 0;
+
+       /* Protect against race where lock could have been just destroyed
+        * due to overlap in ldlm_process_flock_lock().
+        */
+       if (lock->l_flags & LDLM_FL_DESTROYED) {
+               LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
+               return -EIO;
+       }
+
+       /* Import invalidation. We need to actually release the lock
+        * references being held, so that it can go away. No point in
+        * holding the lock even if app still believes it has it, since
+        * server already dropped it anyway. Only for granted locks too.
+        * Do the same for DEADLOCK'ed locks.
+        */
+       if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
+               enum ldlm_mode mode = args ?
+                                     args->fa_mode : lock->l_granted_mode;
+
+               /* args is NULL only for granted locks */
+               LASSERT(args != NULL ||
+                       lock->l_req_mode == lock->l_granted_mode);
+
+               if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
+                       LDLM_DEBUG(lock,
+                                  "client-side enqueue deadlock received");
+                       rc = -EDEADLK;
+               } else {
+                       LDLM_DEBUG(lock, "client-side lock cleanup");
+                       rc = -EIO;
+               }
+               ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
+       }
+
+       return rc;
+}
+
 /**
  * Flock completion callback function.
  *
@@ -642,11 +767,30 @@ restart:
  *
  * \retval 0    : success
  * \retval <0   : failure
+ *
+ * This funclion is called from:
+ * 1. ldlm_cli_enqueue_fini()
+ *     a) grant a new lock or UNLOCK(l_granted_mode == LCK_NL) lock
+ *     b) TEST lock, l_flags & LDLM_FL_TEST_LOCK; if can be granted
+ *        server returns a conflicting lock, otherwise
+ *        l_granted_mode == LCK_NL
+ * 2. ldlm_handle_cp_callback()
+ *     a) grant a new lock
+ *     b) cancel a DEADLOCK'ed lock, l_flags & LDLM_FL_FLOCK_DEADLOCK,
+ *        l_granted_mode == 0
+ *     c) cancel async waiting lock (F_CANCELLK), l_flags & FA_FL_CANCELED,
+ *        l_granted_mode == 0
+ * 3. cleanup_resource() (called only for the forced umount case)
+ *     a) a granted or waiting lock is to be destroyed,
+ *     lock->l_flags & flags have LDLM_FL_FAILED.
+ * 4. races between the 3 above
+ *     a) cleanup vs. reply or CP AST
+ *     b) F_CANCELLK vs. CP AST granting a new lock
  */
 int
 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 {
-       struct file_lock *getlk = lock->l_ast_data;
+       struct ldlm_flock_info *args;
        struct obd_device *obd;
        enum ldlm_error err;
        int rc = 0;
@@ -662,8 +806,8 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                unlock_res_and_lock(lock);
                CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
        }
-       CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n",
-              flags, data, getlk);
+       CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p l_ast_data: %p\n",
+              flags, data, lock->l_ast_data);
 
        LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
 
@@ -723,44 +867,9 @@ granted:
 
        lock_res_and_lock(lock);
 
-
-       /* Protect against race where lock could have been just destroyed
-        * due to overlap in ldlm_process_flock_lock().
-        */
-       if (ldlm_is_destroyed(lock)) {
-               unlock_res_and_lock(lock);
-               LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
-
-               /* error is returned up to ldlm_cli_enqueue_fini() caller. */
-               RETURN(-EIO);
-       }
-
-       /* ldlm_lock_enqueue() has already placed lock on the granted list. */
-       ldlm_resource_unlink_lock(lock);
-
-       /* Import invalidation. We need to actually release the lock
-        * references being held, so that it can go away. No point in
-        * holding the lock even if app still believes it has it, since
-        * server already dropped it anyway. Only for granted locks too.
-        */
-       /* Do the same for DEADLOCK'ed locks. */
-       if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
-               int mode;
-
-               if (flags & LDLM_FL_TEST_LOCK)
-                       LASSERT(ldlm_is_test_lock(lock));
-
-               if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
-                       mode = getlk->C_FLC_TYPE;
-               else
-                       mode = lock->l_req_mode;
-
-               if (ldlm_is_flock_deadlock(lock)) {
-                       LDLM_DEBUG(lock,
-                                  "client-side enqueue deadlock received");
-                       rc = -EDEADLK;
-               }
-               ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
+       rc = ldlm_flock_completion_common(lock);
+       if (rc) {
+               lock->l_ast_data = NULL;
                unlock_res_and_lock(lock);
 
                /* Need to wake up the waiter if we were evicted */
@@ -769,19 +878,33 @@ granted:
                /* An error is still to be returned, to propagate it up to
                 * ldlm_cli_enqueue_fini() caller.
                 */
-               RETURN(rc ? : -EIO);
+               RETURN(rc);
+       }
+
+       args = lock->l_ast_data;
+
+       if (lock->l_granted_mode == LCK_MINMODE) {
+               ldlm_flock_destroy(lock, args->fa_mode, LDLM_FL_WAIT_NOREPROC);
+               lock->l_ast_data = NULL;
+               unlock_res_and_lock(lock);
+               CERROR("%s: client-side: only asynchronous lock enqueue can be canceled by CANCELK\n",
+               lock->l_export->exp_obd->obd_name);
+               RETURN(-EIO);
+       }
+
+       if (args->fa_flags & FA_FL_CANCEL_RQST) {
+               LDLM_DEBUG(lock, "client-side granted CANCELK lock");
+               ldlm_flock_mark_canceled(lock);
        }
 
        LDLM_DEBUG(lock, "client-side enqueue granted");
 
        if (flags & LDLM_FL_TEST_LOCK) {
-               /*
-                * fcntl(F_GETLK) request
-                * The old mode was saved in getlk->C_FLC_TYPE so that if the mode
-                * in the lock changes we can decref the appropriate refcount.
-                */
+               struct file_lock *getlk = args->fa_fl;
+               /* fcntl(F_GETLK) request */
                LASSERT(ldlm_is_test_lock(lock));
-               ldlm_flock_destroy(lock, getlk->C_FLC_TYPE, LDLM_FL_WAIT_NOREPROC);
+               ldlm_flock_destroy(lock, args->fa_mode, LDLM_FL_WAIT_NOREPROC);
+
                switch (lock->l_granted_mode) {
                case LCK_PR:
                        getlk->C_FLC_TYPE = F_RDLCK;
@@ -798,16 +921,96 @@ granted:
        } else {
                __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
 
+               /* ldlm_lock_enqueue() has already placed lock on the granted
+                * list.
+                */
+               ldlm_resource_unlink_lock(lock);
+
                /* We need to reprocess the lock to do merges or splits
                 * with existing locks owned by this process.
                 */
                ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
        }
+       lock->l_ast_data = NULL;
        unlock_res_and_lock(lock);
        RETURN(rc);
 }
 EXPORT_SYMBOL(ldlm_flock_completion_ast);
 
+/* This function is called in same cases as ldlm_flock_completion_ast()
+ * except UNLOCK, TEST lock, F_CANCELLK which are using only
+ * synchronous mechanism
+ */
+struct ldlm_flock_info *
+ldlm_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
+{
+       __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
+       enum ldlm_error err;
+       int rc;
+       struct ldlm_flock_info *args;
+
+       ENTRY;
+       LDLM_DEBUG(lock, "flags: 0x%llx data: %p l_ast_data: %p",
+                  flags, data, lock->l_ast_data);
+
+       LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
+
+       lock_res_and_lock(lock);
+
+       args = lock->l_ast_data;
+       rc = ldlm_flock_completion_common(lock);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       if (lock->l_granted_mode != LCK_NL) {
+               if (args == NULL) {
+                       LDLM_DEBUG(lock,
+                                  "client-side lock is already granted in a race");
+                       LASSERT(lock->l_granted_mode == lock->l_req_mode);
+                       LASSERT(lock->l_granted_mode != LCK_MINMODE);
+                       GOTO(out, rc = 0);
+               }
+
+               if (args->fa_flags & FA_FL_CANCELED ||
+                   ((flags & LDLM_FL_BLOCKED_MASK) == 0 &&
+                    lock->l_granted_mode == LCK_MINMODE)) {
+                       LDLM_DEBUG(lock, "client-side granted canceled lock");
+                       ldlm_flock_destroy(lock, args->fa_mode,
+                                          LDLM_FL_WAIT_NOREPROC);
+                       GOTO(out, rc = -EIO);
+               }
+       }
+
+       if (flags & LDLM_FL_BLOCKED_MASK) {
+               LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock");
+               args = NULL;
+               GOTO(out, rc = 0);
+       }
+
+       if (data != NULL)
+               LDLM_DEBUG(lock, "client-side granted a blocked lock");
+       else
+               LDLM_DEBUG(lock, "client-side lock granted");
+
+       /* ldlm_lock_enqueue() has already placed lock on the granted list. */
+       ldlm_resource_unlink_lock(lock);
+
+       /* We need to reprocess the lock to do merges or splits
+        * with existing locks owned by this process.
+        */
+       ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
+
+out:
+       if (args != NULL) {
+               lock->l_ast_data = NULL;
+               args->fa_err = rc;
+       }
+       unlock_res_and_lock(lock);
+
+       RETURN(args);
+}
+EXPORT_SYMBOL(ldlm_flock_completion_ast_async);
+
 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                            void *data, int flag)
 {
index 4c4f09a..d6bcd6b 100644 (file)
@@ -945,6 +945,15 @@ struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len)
 }
 EXPORT_SYMBOL(ldlm_enqueue_pack);
 
+static void ldlm_lock_add_to_enqueueing(struct ldlm_lock *lock)
+{
+       struct ldlm_resource *res = lock->l_resource;
+
+       lock_res(res);
+       ldlm_resource_add_lock(res, &res->lr_enqueueing, lock);
+       unlock_res(res);
+}
+
 /**
  * Client-side lock enqueue.
  *
@@ -1013,6 +1022,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                                LBUG();
 
                        lock->l_req_extent = policy->l_extent;
+               } else if (einfo->ei_type == LDLM_FLOCK) {
+                       ldlm_lock_add_to_enqueueing(lock);
                }
                LDLM_DEBUG(lock, "client-side enqueue START, flags %#llx",
                           *flags);
index 167ae0d..2c453f9 100644 (file)
@@ -1477,6 +1477,7 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type)
 
        INIT_LIST_HEAD(&res->lr_granted);
        INIT_LIST_HEAD(&res->lr_waiting);
+       INIT_LIST_HEAD(&res->lr_enqueueing);
 
        refcount_set(&res->lr_refcount, 1);
        spin_lock_init(&res->lr_lock);
@@ -1617,6 +1618,11 @@ static void __ldlm_resource_putref_final(struct cfs_hash_bd *bd,
                LBUG();
        }
 
+       if (!list_empty(&res->lr_enqueueing)) {
+               ldlm_resource_dump(D_ERROR, res);
+               LBUG();
+       }
+
        cfs_hash_bd_del_locked(nsb->nsb_namespace->ns_rs_hash,
                               bd, &res->lr_hash);
        if (atomic_dec_and_test(&nsb->nsb_count))
@@ -1673,6 +1679,8 @@ static void __ldlm_resource_add_lock(struct ldlm_resource *res,
 
        if (res->lr_type == LDLM_IBITS)
                ldlm_inodebits_add_lock(res, head, lock, tail);
+       else if (res->lr_type == LDLM_FLOCK)
+               LASSERT(lock->l_req_mode != LCK_NL || head != &res->lr_waiting);
 
        ldlm_resource_dump(D_INFO, res);
 }
index 4d364de..0dc7d07 100644 (file)
@@ -5244,44 +5244,25 @@ int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
        RETURN(rc);
 }
 
-static int
-ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
+static int ll_file_flc2policy(struct file_lock *file_lock, int cmd,
+                      union ldlm_policy_data *flock)
 {
-       struct inode *inode = file_inode(file);
-       struct ll_sb_info *sbi = ll_i2sbi(inode);
-       struct ldlm_enqueue_info einfo = {
-               .ei_type        = LDLM_FLOCK,
-               .ei_cb_cp       = ldlm_flock_completion_ast,
-               .ei_cbdata      = file_lock,
-       };
-       struct md_op_data *op_data;
-       struct lustre_handle lockh = { 0 };
-       union ldlm_policy_data flock = { { 0 } };
-       struct file_lock flbuf = *file_lock;
-       int fl_type = file_lock->C_FLC_TYPE;
-       ktime_t kstart = ktime_get();
-       __u64 flags = 0;
-       int rc;
-       int rc2 = 0;
-
        ENTRY;
-       CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n",
-              PFID(ll_inode2fid(inode)), file_lock);
 
        if (file_lock->C_FLC_FLAGS & FL_FLOCK) {
                LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
                /* flocks are whole-file locks */
-               flock.l_flock.end = OFFSET_MAX;
+               flock->l_flock.end = OFFSET_MAX;
                /* For flocks owner is determined by the local file desctiptor*/
-               flock.l_flock.owner = (unsigned long)file_lock->C_FLC_FILE;
+               flock->l_flock.owner = (unsigned long)file_lock->C_FLC_FILE;
        } else if (file_lock->C_FLC_FLAGS & FL_POSIX) {
-               flock.l_flock.owner = (unsigned long)file_lock->C_FLC_OWNER;
-               flock.l_flock.start = file_lock->fl_start;
-               flock.l_flock.end = file_lock->fl_end;
+               flock->l_flock.owner = (unsigned long)file_lock->C_FLC_OWNER;
+               flock->l_flock.start = file_lock->fl_start;
+               flock->l_flock.end = file_lock->fl_end;
        } else {
                RETURN(-EINVAL);
        }
-       flock.l_flock.pid = file_lock->C_FLC_PID;
+       flock->l_flock.pid = file_lock->C_FLC_PID;
 
 #if defined(HAVE_LM_COMPARE_OWNER) || defined(lm_compare_owner)
        /* Somewhat ugly workaround for svc lockd.
@@ -5293,8 +5274,208 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
         * pointer space for current->files are not intersecting
         */
        if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
-               flock.l_flock.owner = (unsigned long)file_lock->C_FLC_PID;
+               flock->l_flock.owner = (unsigned long)file_lock->C_FLC_PID;
+#endif
+
+       RETURN(0);
+}
+
+static int ll_file_flock_lock(struct file *file, struct file_lock *file_lock)
+{
+       int rc = -EINVAL;
+
+       /* We don't need to sleep on conflicting locks.
+        * It is called in following usecases :
+        * 1. adding new lock - no conflicts exist as it is already granted
+        *    on the server.
+        * 2. unlock - never conflicts with anything.
+        */
+       file_lock->fl_flags &= ~FL_SLEEP;
+#ifdef HAVE_LOCKS_LOCK_FILE_WAIT
+       rc = locks_lock_file_wait(file, file_lock);
+#else
+       if (file_lock->fl_flags & FL_FLOCK) {
+               rc = flock_lock_file_wait(file, file_lock);
+       } else if (file_lock->fl_flags & FL_POSIX) {
+               rc = posix_lock_file(file, file_lock, NULL);
+       }
+#endif /* HAVE_LOCKS_LOCK_FILE_WAIT */
+       if (rc)
+               CDEBUG_LIMIT(rc == -ENOENT ? D_DLMTRACE : D_ERROR,
+                      "kernel lock failed: rc = %d\n", rc);
+
+       return rc;
+}
+
+static int ll_flock_upcall(void *cookie, int err);
+static int
+ll_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data);
+
+static int ll_file_flock_async_unlock(struct inode *inode,
+                                     struct file_lock *file_lock)
+{
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
+                                          .ei_cb_cp =
+                                             ll_flock_completion_ast_async,
+                                          .ei_mode = LCK_NL,
+                                          .ei_cbdata = NULL };
+       union ldlm_policy_data flock = { {0} };
+       struct md_op_data *op_data;
+       int rc;
+
+       ENTRY;
+       rc = ll_file_flc2policy(file_lock, F_SETLK, &flock);
+       if (rc)
+               RETURN(rc);
+
+       op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
+                                    LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data))
+               RETURN(PTR_ERR(op_data));
+
+       rc = md_enqueue_async(sbi->ll_md_exp, &einfo, ll_flock_upcall,
+                             op_data, &flock, 0);
+
+       ll_finish_md_op_data(op_data);
+
+       RETURN(rc);
+}
+
+/* This function is called only once after ldlm callback. Args are already
+ * detached from lock. So, locking isn't needed.
+ * It should only report lock status to kernel.
+ */
+static void ll_file_flock_async_cb(struct ldlm_flock_info *args)
+{
+       struct file_lock *file_lock = args->fa_fl;
+       struct file_lock *flc = &args->fa_flc;
+       struct file *file = args->fa_file;
+       struct inode *inode = file->f_path.dentry->d_inode;
+       int err = args->fa_err;
+       int rc;
+
+       ENTRY;
+       CDEBUG(D_INFO, "err=%d file_lock=%p file=%p start=%llu end=%llu\n",
+              err, file_lock, file, flc->fl_start, flc->fl_end);
+
+       /* The kernel is responsible for resolving grant vs F_CANCELK or
+        * grant vs. cleanup races, it may happen that CANCELED flag
+        * isn't set and err == 0, because f_CANCELK/cleanup happens between
+        * ldlm_flock_completion_ast_async() and ll_flock_run_flock_cb().
+        * In this case notify() returns error for already canceled flock.
+        */
+       if (!(args->fa_flags & FA_FL_CANCELED)) {
+               struct file_lock notify_lock;
+
+               locks_init_lock(&notify_lock);
+               locks_copy_lock(&notify_lock, flc);
+
+               if (err == 0)
+                       ll_file_flock_lock(file, flc);
+
+               wait_event_idle(args->fa_waitq, args->fa_ready);
+
+#ifdef HAVE_LM_GRANT_2ARGS
+               rc = args->fa_notify(&notify_lock, err);
+#else
+               rc = args->fa_notify(&notify_lock, NULL, err);
 #endif
+               if (rc) {
+                       CDEBUG_LIMIT(D_ERROR,
+                                    "notify failed file_lock=%p err=%d\n",
+                                    file_lock, err);
+                       if (err == 0) {
+                               flc->C_FLC_TYPE = F_UNLCK;
+                               ll_file_flock_lock(file, flc);
+                               ll_file_flock_async_unlock(inode, flc);
+                       }
+               }
+       }
+
+       fput(file);
+
+       EXIT;
+}
+
+static void ll_flock_run_flock_cb(struct ldlm_flock_info *args)
+{
+       if (args) {
+               ll_file_flock_async_cb(args);
+               OBD_FREE_PTR(args);
+       }
+}
+
+static int ll_flock_upcall(void *cookie, int err)
+{
+       struct ldlm_flock_info *args;
+       struct ldlm_lock *lock = cookie;
+
+       if (err != 0) {
+               CERROR("ldlm_cli_enqueue_fini lock=%p : rc = %d\n", lock, err);
+
+               lock_res_and_lock(lock);
+               args = lock->l_ast_data;
+               lock->l_ast_data = NULL;
+               unlock_res_and_lock(lock);
+
+               if (args)
+                       args->fa_err = err;
+               ll_flock_run_flock_cb(args);
+       }
+
+       return 0;
+}
+
+static int
+ll_flock_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
+{
+       struct ldlm_flock_info *args;
+
+       ENTRY;
+
+       args = ldlm_flock_completion_ast_async(lock, flags, data);
+       if (args && args->fa_flags & FA_FL_CANCELED) {
+               /* lock was cancelled in a race */
+               struct inode *inode = args->fa_file->f_path.dentry->d_inode;
+
+               ll_file_flock_async_unlock(inode, &args->fa_flc);
+       }
+
+       ll_flock_run_flock_cb(args);
+
+       RETURN(0);
+}
+
+static int
+ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
+{
+       struct inode *inode = file_inode(file);
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct ldlm_enqueue_info einfo = {
+               .ei_type        = LDLM_FLOCK,
+               .ei_cb_cp       = ldlm_flock_completion_ast,
+               .ei_cbdata      = NULL,
+       };
+       struct md_op_data *op_data;
+       struct lustre_handle lockh = { 0 };
+       union ldlm_policy_data flock = { { 0 } };
+       struct file_lock flbuf = *file_lock;
+       int fl_type = file_lock->C_FLC_TYPE;
+       ktime_t kstart = ktime_get();
+       __u64 flags = 0;
+       struct ldlm_flock_info *cb_data = NULL;
+       int rc;
+
+       ENTRY;
+       CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID" file_lock=%p\n",
+              PFID(ll_inode2fid(inode)), file_lock);
+
+       ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
+
+       rc = ll_file_flc2policy(file_lock, cmd, &flock);
+       if (rc)
+               RETURN(rc);
 
        switch (fl_type) {
        case F_RDLCK:
@@ -5346,6 +5527,13 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                 */
                posix_test_lock(file, &flbuf);
                break;
+       case F_CANCELLK:
+               CDEBUG(D_DLMTRACE, "F_CANCELLK owner=%llx %llu-%llu\n",
+                      flock.l_flock.owner, flock.l_flock.start,
+                      flock.l_flock.end);
+               file_lock->C_FLC_TYPE = F_UNLCK;
+               einfo.ei_mode = LCK_NL;
+               break;
        default:
                rc = -EINVAL;
                CERROR("%s: fcntl from '%s' unknown lock command=%d: rc = %d\n",
@@ -5353,51 +5541,102 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                RETURN(rc);
        }
 
-       /* Save the old mode so that if the mode in the lock changes we
-        * can decrement the appropriate reader or writer refcount.
-        */
-       file_lock->C_FLC_TYPE = einfo.ei_mode;
+       CDEBUG(D_DLMTRACE,
+              "inode="DFID", pid=%u, owner=%#llx, flags=%#llx, mode=%u, start=%llu, end=%llu\n",
+              PFID(ll_inode2fid(inode)), flock.l_flock.pid,
+              flock.l_flock.owner, flags, einfo.ei_mode,
+              flock.l_flock.start, flock.l_flock.end);
 
        op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
                                     LUSTRE_OPC_ANY, NULL);
        if (IS_ERR(op_data))
                RETURN(PTR_ERR(op_data));
 
-       CDEBUG(D_DLMTRACE,
-              "inode="DFID", pid=%u, flags=%#llx, mode=%u, start=%llu, end=%llu\n",
-              PFID(ll_inode2fid(inode)),
-              flock.l_flock.pid, flags, einfo.ei_mode,
-              flock.l_flock.start, flock.l_flock.end);
+       OBD_ALLOC_PTR(cb_data);
+       if (!cb_data)
+               GOTO(out, rc = -ENOMEM);
 
-       rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data, &lockh,
-                       flags);
+       cb_data->fa_file = file;
+       cb_data->fa_fl = file_lock;
+       cb_data->fa_mode = einfo.ei_mode;
+       init_waitqueue_head(&cb_data->fa_waitq);
+       locks_init_lock(&cb_data->fa_flc);
+       locks_copy_lock(&cb_data->fa_flc, file_lock);
+       if (cmd == F_CANCELLK)
+               cb_data->fa_flags |= FA_FL_CANCEL_RQST;
+       einfo.ei_cbdata = cb_data;
+
+       if (file_lock->fl_lmops && file_lock->fl_lmops->lm_grant &&
+           file_lock->C_FLC_TYPE != F_UNLCK &&
+           flags == LDLM_FL_BLOCK_NOWAIT /* F_SETLK/F_SETLK64 */) {
+
+               cb_data->fa_notify = file_lock->fl_lmops->lm_grant;
+               flags = (file_lock->fl_flags & FL_SLEEP) ?
+                       0 : LDLM_FL_BLOCK_NOWAIT;
+               einfo.ei_cb_cp = ll_flock_completion_ast_async;
+               get_file(file);
+
+               rc = md_enqueue_async(sbi->ll_md_exp, &einfo,
+                                     ll_flock_upcall, op_data, &flock, flags);
+               if (rc) {
+                       fput(file);
+                       OBD_FREE_PTR(cb_data);
+                       cb_data = NULL;
+               } else {
+                       rc = FILE_LOCK_DEFERRED;
+               }
+       } else {
+               if (file_lock->C_FLC_TYPE == F_UNLCK &&
+                   flags != LDLM_FL_TEST_LOCK) {
+                       /* We unlock kernel lock before ldlm one to avoid race
+                        * with reordering of unlock & lock responses from
+                        * server.
+                        */
+                       cb_data->fa_flc.fl_flags |= FL_EXISTS;
+                       rc = ll_file_flock_lock(file, &cb_data->fa_flc);
+                       if (rc) {
+                               if (rc == -ENOENT) {
+                                       if (!(file_lock->C_FLC_TYPE &
+                                                               FL_EXISTS))
+                                               rc = 0;
+                               } else {
+                                       CDEBUG_LIMIT(D_ERROR,
+                                              "local unlock failed rc=%d\n",
+                                              rc);
+                               }
+                               OBD_FREE_PTR(cb_data);
+                               cb_data = NULL;
+                               GOTO(out, rc);
+                       }
+               }
 
-       /* Restore the file lock type if not TEST lock. */
-       if (!(flags & LDLM_FL_TEST_LOCK))
-               file_lock->C_FLC_TYPE = fl_type;
+               rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data,
+                               &lockh, flags);
 
-#ifdef HAVE_LOCKS_LOCK_FILE_WAIT
-       if ((rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK) &&
-           !(flags & LDLM_FL_TEST_LOCK))
-               rc2  = locks_lock_file_wait(file, file_lock);
-#else
-       if ((file_lock->C_FLC_FLAGS & FL_FLOCK) &&
-           (rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK))
-               rc2  = flock_lock_file_wait(file, file_lock);
-       if ((file_lock->C_FLC_FLAGS & FL_POSIX) &&
-           (rc == 0 || file_lock->C_FLC_TYPE == F_UNLCK) &&
-           !(flags & LDLM_FL_TEST_LOCK))
-               rc2  = posix_lock_file_wait(file, file_lock);
-#endif /* HAVE_LOCKS_LOCK_FILE_WAIT */
 
-       if (rc2 && file_lock->C_FLC_TYPE != F_UNLCK) {
-               einfo.ei_mode = LCK_NL;
-               md_enqueue(sbi->ll_md_exp, &einfo, &flock, op_data,
-                          &lockh, flags);
-               rc = rc2;
-       }
+               if (!rc && file_lock->C_FLC_TYPE != F_UNLCK &&
+                   !(flags & LDLM_FL_TEST_LOCK)) {
+                       int rc2;
 
+                       rc2 = ll_file_flock_lock(file, file_lock);
+
+                       if (rc2) {
+                               einfo.ei_mode = LCK_NL;
+                               cb_data->fa_mode = einfo.ei_mode;
+                               md_enqueue(sbi->ll_md_exp, &einfo, &flock,
+                                          op_data, &lockh, flags);
+                               rc = rc2;
+                       }
+               }
+               OBD_FREE_PTR(cb_data);
+               cb_data = NULL;
+       }
+out:
        ll_finish_md_op_data(op_data);
+       if (cb_data) {
+               cb_data->fa_ready = 1;
+               wake_up(&cb_data->fa_waitq);
+       }
 
        if (rc == 0 && (flags & LDLM_FL_TEST_LOCK) &&
            flbuf.C_FLC_TYPE != file_lock->C_FLC_TYPE) { /* Verify local & remote */
index 03b257b..c5623e7 100644 (file)
@@ -2318,6 +2318,34 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
        RETURN(rc);
 }
 
+static int
+lmv_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                 obd_enqueue_update_f upcall, struct md_op_data *op_data,
+                 const union ldlm_policy_data *policy, __u64 flags)
+{
+       struct obd_device        *obd = exp->exp_obd;
+       struct lmv_obd           *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc      *tgt;
+       int                       rc;
+
+       ENTRY;
+
+       CDEBUG(D_INODE, "ENQUEUE ASYNC on "DFID"\n",
+                       PFID(&op_data->op_fid1));
+
+       tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
+       CDEBUG(D_INODE, "ENQUEUE ASYNC on "DFID" -> mds #%d\n",
+              PFID(&op_data->op_fid1), tgt->ltd_index);
+
+       rc = md_enqueue_async(tgt->ltd_exp, einfo, upcall, op_data, policy,
+                             flags);
+
+       RETURN(rc);
+}
+
 int
 lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
                 struct ptlrpc_request **preq)
@@ -4463,6 +4491,7 @@ static const struct md_ops lmv_md_ops = {
        .m_close                = lmv_close,
        .m_create               = lmv_create,
        .m_enqueue              = lmv_enqueue,
+       .m_enqueue_async        = lmv_enqueue_async,
        .m_getattr              = lmv_getattr,
        .m_getxattr             = lmv_getxattr,
        .m_getattr_name         = lmv_getattr_name,
index 197f864..d07e7e8 100644 (file)
@@ -72,6 +72,11 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                const union ldlm_policy_data *policy,
                struct md_op_data *op_data,
                struct lustre_handle *lockh, __u64 extra_lock_flags);
+
+int mdc_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                     obd_enqueue_update_f upcall, struct md_op_data *op_data,
+                     const union ldlm_policy_data *policy, __u64 lock_flags);
+
 int mdc_resource_get_unused_res(struct obd_export *exp,
                                struct ldlm_res_id *res_id,
                                struct list_head *cancels,
index 7b44133..a772865 100644 (file)
@@ -33,6 +33,14 @@ struct mdc_getattr_args {
        struct md_op_item       *ga_item;
 };
 
+struct mdc_enqueue_args {
+       struct ldlm_lock                *mea_lock;
+       struct obd_export               *mea_exp;
+       enum ldlm_mode                  mea_mode;
+       __u64                           mea_flags;
+       obd_enqueue_update_f            mea_upcall;
+};
+
 int it_open_error(int phase, struct lookup_intent *it)
 {
        if (it_disposition(it, DISP_OPEN_LEASE)) {
@@ -1180,6 +1188,86 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                                op_data, lockh, extra_lock_flags);
 }
 
+static int mdc_enqueue_async_interpret(const struct lu_env *env,
+                                      struct ptlrpc_request *req,
+                                      void *args, int rc)
+{
+       struct mdc_enqueue_args *mea = args;
+       struct obd_export       *exp = mea->mea_exp;
+       struct ldlm_lock        *lock = mea->mea_lock;
+       struct lustre_handle    lockh;
+       struct ldlm_enqueue_info  einfo = {
+                       .ei_type = LDLM_FLOCK,
+                       .ei_mode = mea->mea_mode,
+       };
+
+       ENTRY;
+       CDEBUG(D_INFO, "req=%p rc=%d\n", req, rc);
+
+       ldlm_lock2handle(lock, &lockh);
+       rc = ldlm_cli_enqueue_fini(exp, &req->rq_pill, &einfo, 1,
+                                 &mea->mea_flags, NULL, 0, &lockh, rc, true);
+       if (rc == -ENOLCK)
+               LDLM_LOCK_RELEASE(lock);
+
+       /* we expect failed_lock_cleanup() to destroy lock */
+       if (rc != 0)
+               LASSERT(list_empty(&lock->l_res_link));
+
+       if (mea->mea_upcall != NULL)
+               mea->mea_upcall(lock, rc);
+
+       LDLM_LOCK_PUT(lock);
+
+       RETURN(rc);
+}
+
+int mdc_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                     obd_enqueue_update_f upcall, struct md_op_data *op_data,
+                     const union ldlm_policy_data *policy, __u64 flags)
+{
+       struct mdc_enqueue_args *mea;
+       struct ptlrpc_request *req;
+       int                    rc;
+       struct ldlm_res_id res_id;
+       struct lustre_handle lockh;
+
+       ENTRY;
+       fid_build_reg_res_name(&op_data->op_fid1, &res_id);
+
+       LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
+                einfo->ei_type);
+       res_id.name[3] = LDLM_FLOCK;
+
+       req = ldlm_enqueue_pack(exp, 0);
+       if (IS_ERR(req))
+               RETURN(PTR_ERR(req));
+
+       einfo->ei_req_slot = 1;
+       einfo->ei_mod_slot = 1;
+
+       rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
+                             0, 0, &lockh, 1);
+       if (rc) {
+               ptlrpc_req_put(req);
+               RETURN(rc);
+       }
+
+       mea = ptlrpc_req_async_args(mea, req);
+       mea->mea_exp = exp;
+       mea->mea_lock = ldlm_handle2lock(&lockh);
+       LASSERT(mea->mea_lock != NULL);
+
+       mea->mea_mode = einfo->ei_mode;
+       mea->mea_flags = flags;
+       mea->mea_upcall = upcall;
+
+       req->rq_interpret_reply = mdc_enqueue_async_interpret;
+       ptlrpcd_add_req(req);
+
+       RETURN(0);
+}
+
 static int mdc_finish_intent_lock(struct obd_export *exp,
                                  struct ptlrpc_request *request,
                                  struct md_op_data *op_data,
index 6eee865..05ee41e 100644 (file)
@@ -3103,6 +3103,7 @@ static const struct md_ops mdc_md_ops = {
        .m_close            = mdc_close,
        .m_create           = mdc_create,
        .m_enqueue          = mdc_enqueue,
+       .m_enqueue_async    = mdc_enqueue_async,
        .m_getattr          = mdc_getattr,
        .m_getattr_name     = mdc_getattr_name,
        .m_intent_lock      = mdc_intent_lock,
index a11e826..9f35021 100644 (file)
@@ -1614,6 +1614,7 @@ static const char * const mps_stats[] = {
        [LPROC_MD_CLOSE]                = "close",
        [LPROC_MD_CREATE]               = "create",
        [LPROC_MD_ENQUEUE]              = "enqueue",
+       [LPROC_MD_ENQUEUE_ASYNC]        = "enqueue_async",
        [LPROC_MD_GETATTR]              = "getattr",
        [LPROC_MD_INTENT_LOCK]          = "intent_lock",
        [LPROC_MD_LINK]                 = "link",