Whamcloud - gitweb
LU-925 agl: async glimpse lock process in CLIO stack
authorFan Yong <yong.fan@whamcloud.com>
Fri, 6 Jan 2012 06:49:42 +0000 (14:49 +0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 10 Jan 2012 18:03:09 +0000 (13:03 -0500)
Adjust CLIO lock state machine for supporting:
1. unuse lock in non-hold state.
2. re-enqueue non-granted glimpse lock.

Signed-off-by: Fan Yong <yong.fan@whamcloud.com>
Change-Id: I9de8939a398d7b4c7062e6c5859bca06deddd089
Reviewed-on: http://review.whamcloud.com/1243
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Tested-by: Hudson
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
17 files changed:
lustre/include/cl_object.h
lustre/include/lclient.h
lustre/include/lustre_dlm.h
lustre/include/obd_ost.h
lustre/include/obd_support.h
lustre/lclient/glimpse.c
lustre/lclient/lcommon_cl.c
lustre/ldlm/ldlm_lock.c
lustre/llite/statahead.c
lustre/lov/lov_lock.c
lustre/obdclass/cl_lock.c
lustre/obdfilter/filter.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_lock.c
lustre/osc/osc_request.c
lustre/tests/sanity.sh

index cae226b..b4c9bd8 100644 (file)
@@ -1616,9 +1616,11 @@ struct cl_lock_slice {
  */
 enum cl_lock_transition {
         /** operation cannot be completed immediately. Wait for state change. */
  */
 enum cl_lock_transition {
         /** operation cannot be completed immediately. Wait for state change. */
-        CLO_WAIT   = 1,
+        CLO_WAIT        = 1,
         /** operation had to release lock mutex, restart. */
         /** operation had to release lock mutex, restart. */
-        CLO_REPEAT = 2
+        CLO_REPEAT      = 2,
+        /** lower layer re-enqueued. */
+        CLO_REENQUEUED  = 3,
 };
 
 /**
 };
 
 /**
@@ -2155,9 +2157,17 @@ enum cl_enq_flags {
          */
         CEF_NEVER        = 0x00000010,
         /**
          */
         CEF_NEVER        = 0x00000010,
         /**
+         * for async glimpse lock.
+         */
+        CEF_AGL          = 0x00000020,
+        /**
+         * do not trigger re-enqueue.
+         */
+        CEF_NO_REENQUEUE = 0x00000040,
+        /**
          * mask of enq_flags.
          */
          * mask of enq_flags.
          */
-        CEF_MASK         = 0x0000001f
+        CEF_MASK         = 0x0000007f,
 };
 
 /**
 };
 
 /**
index dd6721a..68b49e1 100644 (file)
@@ -28,6 +28,8 @@
 /*
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
 /*
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 blkcnt_t dirty_cnt(struct inode *inode);
 
 
 blkcnt_t dirty_cnt(struct inode *inode);
 
-int cl_glimpse_size(struct inode *inode);
+int cl_glimpse_size0(struct inode *inode, int agl);
 int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
 int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
-                    struct inode *inode, struct cl_object *clob);
+                    struct inode *inode, struct cl_object *clob, int agl);
+
+static inline int cl_glimpse_size(struct inode *inode)
+{
+        return cl_glimpse_size0(inode, 0);
+}
+
+static inline int cl_agl(struct inode *inode)
+{
+        return cl_glimpse_size0(inode, 1);
+}
 
 /**
  * Locking policy for setattr.
 
 /**
  * Locking policy for setattr.
index cf9c70d..1d4a2a7 100644 (file)
@@ -122,14 +122,15 @@ typedef enum {
 #define LDLM_FL_HAS_INTENT     0x001000 /* lock request has intent */
 #define LDLM_FL_CANCELING      0x002000 /* lock cancel has already been sent */
 #define LDLM_FL_LOCAL          0x004000 /* local lock (ie, no srv/cli split) */
 #define LDLM_FL_HAS_INTENT     0x001000 /* lock request has intent */
 #define LDLM_FL_CANCELING      0x002000 /* lock cancel has already been sent */
 #define LDLM_FL_LOCAL          0x004000 /* local lock (ie, no srv/cli split) */
-/* was LDLM_FL_WARN  until 2.0.0  0x008000 */
 #define LDLM_FL_DISCARD_DATA   0x010000 /* discard (no writeback) on cancel */
 
 #define LDLM_FL_NO_TIMEOUT     0x020000 /* Blocked by group lock - wait
                                          * indefinitely */
 
 /* file & record locking */
 #define LDLM_FL_DISCARD_DATA   0x010000 /* discard (no writeback) on cancel */
 
 #define LDLM_FL_NO_TIMEOUT     0x020000 /* Blocked by group lock - wait
                                          * indefinitely */
 
 /* file & record locking */
-#define LDLM_FL_BLOCK_NOWAIT   0x040000 // server told not to wait if blocked
+#define LDLM_FL_BLOCK_NOWAIT   0x040000 /* server told not to wait if blocked.
+                                         * For AGL, OST will not send glimpse
+                                         * callback. */
 #define LDLM_FL_TEST_LOCK      0x080000 // return blocking lock
 
 /* XXX FIXME: This is being added to b_size as a low-risk fix to the fact that
 #define LDLM_FL_TEST_LOCK      0x080000 // return blocking lock
 
 /* XXX FIXME: This is being added to b_size as a low-risk fix to the fact that
@@ -176,8 +177,6 @@ typedef enum {
  * w/o involving separate thread. in order to decrease cs rate */
 #define LDLM_FL_ATOMIC_CB      0x4000000
 
  * w/o involving separate thread. in order to decrease cs rate */
 #define LDLM_FL_ATOMIC_CB      0x4000000
 
-/* was LDLM_FL_ASYNC until 2.0.0 0x8000000 */
-
 /* It may happen that a client initiate 2 operations, e.g. unlink and mkdir,
  * such that server send blocking ast for conflict locks to this client for
  * the 1st operation, whereas the 2nd operation has canceled this lock and
 /* It may happen that a client initiate 2 operations, e.g. unlink and mkdir,
  * such that server send blocking ast for conflict locks to this client for
  * the 1st operation, whereas the 2nd operation has canceled this lock and
@@ -736,6 +735,7 @@ struct ldlm_lock {
          * Client-side-only members.
          */
 
          * Client-side-only members.
          */
 
+        int                   l_fail_value;
         /**
          * Temporary storage for an LVB received during an enqueue operation.
          */
         /**
          * Temporary storage for an LVB received during an enqueue operation.
          */
@@ -763,6 +763,7 @@ struct ldlm_lock {
          */
         __u32                 l_pid;
 
          */
         __u32                 l_pid;
 
+        int                   l_bl_ast_run;
         /**
          * For ldlm_add_ast_work_item().
          */
         /**
          * For ldlm_add_ast_work_item().
          */
@@ -777,7 +778,6 @@ struct ldlm_lock {
         cfs_list_t            l_rk_ast;
 
         struct ldlm_lock     *l_blocking_lock;
         cfs_list_t            l_rk_ast;
 
         struct ldlm_lock     *l_blocking_lock;
-        int                   l_bl_ast_run;
 
         /**
          * Protected by lr_lock, linkages to "skip lists".
 
         /**
          * Protected by lr_lock, linkages to "skip lists".
@@ -1078,6 +1078,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
 int  ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
 int  ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
+void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc);
+void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc);
 void ldlm_lock_allow_match(struct ldlm_lock *lock);
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock);
 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 void ldlm_lock_allow_match(struct ldlm_lock *lock);
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock);
 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
index 81b5f44..08b470a 100644 (file)
@@ -77,6 +77,7 @@ struct osc_enqueue_args {
         struct ost_lvb           *oa_lvb;
         struct lustre_handle     *oa_lockh;
         struct ldlm_enqueue_info *oa_ei;
         struct ost_lvb           *oa_lvb;
         struct lustre_handle     *oa_lockh;
         struct ldlm_enqueue_info *oa_ei;
+        unsigned int              oa_agl:1;
 };
 
 #if 0
 };
 
 #if 0
index 739f2e0..91678ec 100644 (file)
@@ -28,6 +28,8 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -313,6 +315,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LDLM_INTR_CP_AST        0x317
 #define OBD_FAIL_LDLM_CP_BL_RACE         0x318
 #define OBD_FAIL_LDLM_NEW_LOCK           0x319
 #define OBD_FAIL_LDLM_INTR_CP_AST        0x317
 #define OBD_FAIL_LDLM_CP_BL_RACE         0x318
 #define OBD_FAIL_LDLM_NEW_LOCK           0x319
+#define OBD_FAIL_LDLM_AGL_DELAY          0x31a
+#define OBD_FAIL_LDLM_AGL_NOLOCK         0x31b
 
 /* LOCKLESS IO */
 #define OBD_FAIL_LDLM_SET_CONTENTION     0x385
 
 /* LOCKLESS IO */
 #define OBD_FAIL_LDLM_SET_CONTENTION     0x385
index c95ccce..a4f7f13 100644 (file)
@@ -105,7 +105,7 @@ blkcnt_t dirty_cnt(struct inode *inode)
 }
 
 int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
 }
 
 int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
-                    struct inode *inode, struct cl_object *clob)
+                    struct inode *inode, struct cl_object *clob, int agl)
 {
         struct cl_lock_descr *descr = &ccc_env_info(env)->cti_descr;
         struct cl_inode_info *lli   = cl_i2info(inode);
 {
         struct cl_lock_descr *descr = &ccc_env_info(env)->cti_descr;
         struct cl_inode_info *lli   = cl_i2info(inode);
@@ -136,6 +136,8 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
                         descr->cld_obj   = clob;
                         descr->cld_mode  = CLM_PHANTOM;
                         descr->cld_enq_flags = CEF_ASYNC | CEF_MUST;
                         descr->cld_obj   = clob;
                         descr->cld_mode  = CLM_PHANTOM;
                         descr->cld_enq_flags = CEF_ASYNC | CEF_MUST;
+                        if (agl)
+                                descr->cld_enq_flags |= CEF_AGL;
                         cio->cui_glimpse = 1;
                         /*
                          * CEF_ASYNC is used because glimpse sub-locks cannot
                         cio->cui_glimpse = 1;
                         /*
                          * CEF_ASYNC is used because glimpse sub-locks cannot
@@ -149,9 +151,13 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
                                                cfs_current());
                         cio->cui_glimpse = 0;
 
                                                cfs_current());
                         cio->cui_glimpse = 0;
 
+                        if (lock == NULL)
+                                RETURN(0);
+
                         if (IS_ERR(lock))
                                 RETURN(PTR_ERR(lock));
 
                         if (IS_ERR(lock))
                                 RETURN(PTR_ERR(lock));
 
+                        LASSERT(agl == 0);
                         result = cl_wait(env, lock);
                         if (result == 0) {
                                 cl_merge_lvb(inode);
                         result = cl_wait(env, lock);
                         if (result == 0) {
                                 cl_merge_lvb(inode);
@@ -200,7 +206,7 @@ static int cl_io_get(struct inode *inode, struct lu_env **envout,
         return result;
 }
 
         return result;
 }
 
-int cl_glimpse_size(struct inode *inode)
+int cl_glimpse_size0(struct inode *inode, int agl)
 {
         /*
          * We don't need ast_flags argument to cl_glimpse_size(), because
 {
         /*
          * We don't need ast_flags argument to cl_glimpse_size(), because
@@ -229,7 +235,8 @@ int cl_glimpse_size(struct inode *inode)
                          */
                         result = io->ci_result;
                 else if (result == 0)
                          */
                         result = io->ci_result;
                 else if (result == 0)
-                        result = cl_glimpse_lock(env, io, inode, io->ci_obj);
+                        result = cl_glimpse_lock(env, io, inode, io->ci_obj,
+                                                 agl);
                 cl_io_fini(env, io);
                 cl_env_put(env, &refcheck);
         }
                 cl_io_fini(env, io);
                 cl_env_put(env, &refcheck);
         }
index 792dc2c..01b8459 100644 (file)
@@ -894,7 +894,7 @@ int ccc_prep_size(const struct lu_env *env, struct cl_object *obj,
                          * of the buffer (C)
                          */
                         ccc_object_size_unlock(obj);
                          * of the buffer (C)
                          */
                         ccc_object_size_unlock(obj);
-                        result = cl_glimpse_lock(env, io, inode, obj);
+                        result = cl_glimpse_lock(env, io, inode, obj, 0);
                         if (result == 0 && exceed != NULL) {
                                 /* If objective page index exceed end-of-file
                                  * page index, return directly. Do not expect
                         if (result == 0 && exceed != NULL) {
                                 /* If objective page index exceed end-of-file
                                  * page index, return directly. Do not expect
index eaaaa43..9d95c13 100644 (file)
@@ -1051,7 +1051,8 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue,
                         continue;
 
                 if (!unref &&
                         continue;
 
                 if (!unref &&
-                    (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED)))
+                    (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
+                     lock->l_fail_value != 0))
                         continue;
 
                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
                         continue;
 
                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
@@ -1071,6 +1072,23 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue,
         return NULL;
 }
 
         return NULL;
 }
 
+void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc)
+{
+        if (lock->l_fail_value == 0) {
+                lock->l_fail_value = rc;
+                cfs_waitq_signal(&lock->l_waitq);
+        }
+}
+EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
+
+void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc)
+{
+        lock_res_and_lock(lock);
+        ldlm_lock_fail_match_locked(lock, rc);
+        unlock_res_and_lock(lock);
+}
+EXPORT_SYMBOL(ldlm_lock_fail_match);
+
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
 {
         lock->l_flags |= LDLM_FL_LVB_READY;
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
 {
         lock->l_flags |= LDLM_FL_LVB_READY;
@@ -1183,7 +1201,16 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 
                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
                         l_wait_event(lock->l_waitq,
 
                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
                         l_wait_event(lock->l_waitq,
-                                     (lock->l_flags & LDLM_FL_LVB_READY), &lwi);
+                                     lock->l_flags & LDLM_FL_LVB_READY ||
+                                     lock->l_fail_value != 0,
+                                     &lwi);
+                        if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
+                                if (flags & LDLM_FL_TEST_LOCK)
+                                        LDLM_LOCK_RELEASE(lock);
+                                else
+                                        ldlm_lock_decref_internal(lock, mode);
+                                rc = 0;
+                        }
                 }
         }
  out2:
                 }
         }
  out2:
@@ -1231,7 +1258,8 @@ ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
         lock = ldlm_handle2lock(lockh);
         if (lock != NULL) {
                 lock_res_and_lock(lock);
         lock = ldlm_handle2lock(lockh);
         if (lock != NULL) {
                 lock_res_and_lock(lock);
-                if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED)
+                if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
+                    lock->l_fail_value != 0)
                         GOTO(out, mode);
 
                 if (lock->l_flags & LDLM_FL_CBPENDING &&
                         GOTO(out, mode);
 
                 if (lock->l_flags & LDLM_FL_CBPENDING &&
index 537b44d..4f75f43 100644 (file)
@@ -571,10 +571,6 @@ static void ll_sai_put(struct ll_statahead_info *sai)
         EXIT;
 }
 
         EXIT;
 }
 
-#ifndef HAVE_AGL_SUPPORT
-# define cl_agl(inode)  do {} while (0)
-#endif
-
 /* Do NOT forget to drop inode refcount when into sai_entries_agl. */
 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
 {
 /* Do NOT forget to drop inode refcount when into sai_entries_agl. */
 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
 {
index d633309..27310d1 100644 (file)
@@ -28,6 +28,8 @@
 /*
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
 /*
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -267,12 +269,12 @@ static int lov_subresult(int result, int rc)
         int result_rank;
         int rc_rank;
 
         int result_rank;
         int rc_rank;
 
+        ENTRY;
+
         LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
         LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
         CLASSERT(CLO_WAIT < CLO_REPEAT);
 
         LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
         LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
         CLASSERT(CLO_WAIT < CLO_REPEAT);
 
-        ENTRY;
-
         /* calculate ranks in the ordering above */
         result_rank = result < 0 ? 1 + CLO_REPEAT : result;
         rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
         /* calculate ranks in the ordering above */
         result_rank = result < 0 ? 1 + CLO_REPEAT : result;
         rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
@@ -524,7 +526,7 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
 
         /* first, try to enqueue a sub-lock ... */
         result = cl_enqueue_try(env, sublock, io, enqflags);
 
         /* first, try to enqueue a sub-lock ... */
         result = cl_enqueue_try(env, sublock, io, enqflags);
-        if (sublock->cll_state == CLS_ENQUEUED)
+        if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL))
                 /* if it is enqueued, try to `wait' on it---maybe it's already
                  * granted */
                 result = cl_wait_try(env, sublock);
                 /* if it is enqueued, try to `wait' on it---maybe it's already
                  * granted */
                 result = cl_wait_try(env, sublock);
@@ -533,8 +535,8 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
          * parallel, otherwise---enqueue has to wait until sub-lock is granted
          * before proceeding to the next one.
          */
          * parallel, otherwise---enqueue has to wait until sub-lock is granted
          * before proceeding to the next one.
          */
-        if (result == CLO_WAIT && sublock->cll_state <= CLS_HELD &&
-            enqflags & CEF_ASYNC && !last)
+        if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) &&
+            (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL)))
                 result = 0;
         RETURN(result);
 }
                 result = 0;
         RETURN(result);
 }
@@ -697,7 +699,21 @@ static int lov_lock_unuse(const struct lu_env *env,
                 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         if (lls->sub_flags & LSF_HELD) {
                 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         if (lls->sub_flags & LSF_HELD) {
-                                LASSERT(sublock->cll_state == CLS_HELD);
+                                LASSERT(sublock->cll_state == CLS_HELD ||
+                                        sublock->cll_state == CLS_ENQUEUED);
+                                /* For AGL case, the sublock state maybe not
+                                 * match the lower layer state, so sync them
+                                 * before unuse. */
+                                if (sublock->cll_users == 1 &&
+                                    sublock->cll_state == CLS_ENQUEUED) {
+                                        __u32 save;
+
+                                        save = sublock->cll_descr.cld_enq_flags;
+                                        sublock->cll_descr.cld_enq_flags |=
+                                                        CEF_NO_REENQUEUE;
+                                        cl_wait_try(env, sublock);
+                                        sublock->cll_descr.cld_enq_flags = save;
+                                }
                                 rc = cl_unuse_try(subenv->lse_env, sublock);
                                 rc = lov_sublock_release(env, lck, i, 0, rc);
                         }
                                 rc = cl_unuse_try(subenv->lse_env, sublock);
                                 rc = lov_sublock_release(env, lck, i, 0, rc);
                         }
@@ -789,12 +805,15 @@ static int lov_lock_wait(const struct lu_env *env,
         struct lov_lock        *lck     = cl2lov_lock(slice);
         struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
         enum cl_lock_state      minstate;
         struct lov_lock        *lck     = cl2lov_lock(slice);
         struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
         enum cl_lock_state      minstate;
+        int                     reenqueued;
         int                     result;
         int                     i;
 
         ENTRY;
 
         int                     result;
         int                     i;
 
         ENTRY;
 
-        for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
+again:
+        for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
+             i < lck->lls_nr; ++i) {
                 int rc;
                 struct lovsub_lock     *sub;
                 struct cl_lock         *sublock;
                 int rc;
                 struct lovsub_lock     *sub;
                 struct cl_lock         *sublock;
@@ -814,10 +833,18 @@ static int lov_lock_wait(const struct lu_env *env,
                         minstate = min(minstate, sublock->cll_state);
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                         minstate = min(minstate, sublock->cll_state);
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
+                if (rc == CLO_REENQUEUED) {
+                        reenqueued++;
+                        rc = 0;
+                }
                 result = lov_subresult(result, rc);
                 if (result != 0)
                         break;
         }
                 result = lov_subresult(result, rc);
                 if (result != 0)
                         break;
         }
+        /* Each sublock only can be reenqueued once, so will not loop for
+         * ever. */
+        if (result == 0 && reenqueued != 0)
+                goto again;
         cl_lock_closure_fini(closure);
         RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
 }
         cl_lock_closure_fini(closure);
         RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
 }
@@ -863,6 +890,11 @@ static int lov_lock_use(const struct lu_env *env,
                                 if (rc != 0)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 1, rc);
                                 if (rc != 0)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 1, rc);
+                        } else if (sublock->cll_state == CLS_NEW) {
+                                /* Sub-lock might have been canceled, while
+                                 * top-lock was cached. */
+                                result = -ESTALE;
+                                lov_sublock_release(env, lck, i, 1, result);
                         }
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                         }
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
index b8096b7..7d8fc62 100644 (file)
@@ -925,7 +925,6 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
         EXIT;
 }
 
         EXIT;
 }
 
-
 /**
  * Waits until lock state is changed.
  *
 /**
  * Waits until lock state is changed.
  *
@@ -1303,7 +1302,8 @@ static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
                 cl_lock_user_del(env, lock);
                 cl_lock_error(env, lock, result);
         }
                 cl_lock_user_del(env, lock);
                 cl_lock_error(env, lock, result);
         }
-        LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
+        LASSERT(ergo(result == 0 && !(enqflags & CEF_AGL),
+                     lock->cll_state == CLS_ENQUEUED ||
                      lock->cll_state == CLS_HELD));
         RETURN(result);
 }
                      lock->cll_state == CLS_HELD));
         RETURN(result);
 }
@@ -2150,25 +2150,34 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
         ENTRY;
         do {
                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
         ENTRY;
         do {
                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
-                if (!IS_ERR(lock)) {
-                        rc = cl_enqueue_locked(env, lock, io, enqflags);
-                        if (rc == 0) {
-                                if (cl_lock_fits_into(env, lock, need, io)) {
+                if (IS_ERR(lock))
+                        break;
+
+                rc = cl_enqueue_locked(env, lock, io, enqflags);
+                if (rc == 0) {
+                        if (cl_lock_fits_into(env, lock, need, io)) {
+                                if (!(enqflags & CEF_AGL)) {
                                         cl_lock_mutex_put(env, lock);
                                         cl_lock_mutex_put(env, lock);
-                                        cl_lock_lockdep_acquire(env,
-                                                                lock, enqflags);
+                                        cl_lock_lockdep_acquire(env, lock,
+                                                                enqflags);
                                         break;
                                 }
                                         break;
                                 }
-                                cl_unuse_locked(env, lock);
+                                rc = 1;
                         }
                         }
-                        cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
-                        cl_lock_hold_release(env, lock, scope, source);
-                        cl_lock_mutex_put(env, lock);
-                        lu_ref_del(&lock->cll_reference, scope, source);
-                        cl_lock_put(env, lock);
+                        cl_unuse_locked(env, lock);
+                }
+                cl_lock_trace(D_DLMTRACE, env,
+                              rc <= 0 ? "enqueue failed" : "agl succeed", lock);
+                cl_lock_hold_release(env, lock, scope, source);
+                cl_lock_mutex_put(env, lock);
+                lu_ref_del(&lock->cll_reference, scope, source);
+                cl_lock_put(env, lock);
+                if (rc > 0) {
+                        LASSERT(enqflags & CEF_AGL);
+                        lock = NULL;
+                } else if (rc != 0) {
                         lock = ERR_PTR(rc);
                         lock = ERR_PTR(rc);
-                } else
-                        rc = PTR_ERR(lock);
+                }
         } while (rc == 0);
         RETURN(lock);
 }
         } while (rc == 0);
         RETURN(lock);
 }
index ffd551e..9ba4d36 100644 (file)
@@ -1693,7 +1693,6 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                                 struct ldlm_lock **lockp, void *req_cookie,
                                 ldlm_mode_t mode, int flags, void *data)
 {
                                 struct ldlm_lock **lockp, void *req_cookie,
                                 ldlm_mode_t mode, int flags, void *data)
 {
-        CFS_LIST_HEAD(rpc_list);
         struct ptlrpc_request *req = req_cookie;
         struct ldlm_lock *lock = *lockp, *l = NULL;
         struct ldlm_resource *res = lock->l_resource;
         struct ptlrpc_request *req = req_cookie;
         struct ldlm_lock *lock = *lockp, *l = NULL;
         struct ldlm_resource *res = lock->l_resource;
@@ -1732,24 +1731,18 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
          * lock, and should not be granted if the lock will be blocked.
          */
 
          * lock, and should not be granted if the lock will be blocked.
          */
 
+        if (flags & LDLM_FL_BLOCK_NOWAIT) {
+                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5);
+
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK))
+                        RETURN(ELDLM_LOCK_ABORTED);
+        }
+
         LASSERT(ns == ldlm_res_to_ns(res));
         lock_res(res);
         LASSERT(ns == ldlm_res_to_ns(res));
         lock_res(res);
-        rc = policy(lock, &tmpflags, 0, &err, &rpc_list);
+        rc = policy(lock, &tmpflags, 0, &err, NULL);
         check_res_locked(res);
 
         check_res_locked(res);
 
-        /* FIXME: we should change the policy function slightly, to not make
-         * this list at all, since we just turn around and free it */
-        while (!cfs_list_empty(&rpc_list)) {
-                struct ldlm_lock *wlock =
-                        cfs_list_entry(rpc_list.next, struct ldlm_lock,
-                                       l_cp_ast);
-                LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
-                LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
-                lock->l_flags &= ~LDLM_FL_CP_REQD;
-                cfs_list_del_init(&wlock->l_cp_ast);
-                LDLM_LOCK_RELEASE(wlock);
-        }
-
         /* The lock met with no resistance; we're finished. */
         if (rc == LDLM_ITER_CONTINUE) {
                 /* do not grant locks to the liblustre clients: they cannot
         /* The lock met with no resistance; we're finished. */
         if (rc == LDLM_ITER_CONTINUE) {
                 /* do not grant locks to the liblustre clients: they cannot
@@ -1766,6 +1759,12 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                 }
                 unlock_res(res);
                 RETURN(err);
                 }
                 unlock_res(res);
                 RETURN(err);
+        } else if (flags & LDLM_FL_BLOCK_NOWAIT) {
+                /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse
+                 * callback for glimpse size. The real size user will trigger
+                 * the glimpse callback when necessary. */
+                unlock_res(res);
+                RETURN(ELDLM_LOCK_ABORTED);
         }
 
         /* Do not grant any lock, but instead send GL callbacks.  The extent
         }
 
         /* Do not grant any lock, but instead send GL callbacks.  The extent
index 6478c26..b0fbef6 100644 (file)
@@ -253,7 +253,11 @@ struct osc_lock {
          * granted.
          * Glimpse lock should be destroyed immediately after use.
          */
          * granted.
          * Glimpse lock should be destroyed immediately after use.
          */
-                                 ols_glimpse:1;
+                                 ols_glimpse:1,
+        /**
+         * For async glimpse lock.
+         */
+                                 ols_agl:1;
         /**
          * IO that owns this lock. This field is used for a dead-lock
          * avoidance by osc_lock_enqueue_wait().
         /**
          * IO that owns this lock. This field is used for a dead-lock
          * avoidance by osc_lock_enqueue_wait().
index 8e79f7a..1236240 100644 (file)
@@ -132,7 +132,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                      obd_enqueue_update_f upcall,
                      void *cookie, struct ldlm_enqueue_info *einfo,
                      struct lustre_handle *lockh,
                      obd_enqueue_update_f upcall,
                      void *cookie, struct ldlm_enqueue_info *einfo,
                      struct lustre_handle *lockh,
-                     struct ptlrpc_request_set *rqset, int async);
+                     struct ptlrpc_request_set *rqset, int async, int agl);
 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode);
 
 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode);
 
 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
index 42dd2c4..9aa7f59 100644 (file)
@@ -196,23 +196,32 @@ static int osc_lock_unuse(const struct lu_env *env,
 {
         struct osc_lock *ols = cl2osc_lock(slice);
 
 {
         struct osc_lock *ols = cl2osc_lock(slice);
 
-        LASSERT(ols->ols_state == OLS_GRANTED ||
-                ols->ols_state == OLS_UPCALL_RECEIVED);
         LINVRNT(osc_lock_invariant(ols));
 
         LINVRNT(osc_lock_invariant(ols));
 
-        if (ols->ols_glimpse) {
-                LASSERT(ols->ols_hold == 0);
+        switch (ols->ols_state) {
+        case OLS_NEW:
+                LASSERT(!ols->ols_hold);
+                LASSERT(ols->ols_agl);
+                return 0;
+        case OLS_UPCALL_RECEIVED:
+                LASSERT(!ols->ols_hold);
+                ols->ols_state = OLS_NEW;
                 return 0;
                 return 0;
+        case OLS_GRANTED:
+                LASSERT(!ols->ols_glimpse);
+                LASSERT(ols->ols_hold);
+                /*
+                 * Move lock into OLS_RELEASED state before calling
+                 * osc_cancel_base() so that possible synchronous cancellation
+                 * (that always happens e.g., for liblustre) sees that lock is
+                 * released.
+                 */
+                ols->ols_state = OLS_RELEASED;
+                return osc_lock_unhold(ols);
+        default:
+                CERROR("Impossible state: %d\n", ols->ols_state);
+                LBUG();
         }
         }
-        LASSERT(ols->ols_hold);
-
-        /*
-         * Move lock into OLS_RELEASED state before calling osc_cancel_base()
-         * so that possible synchronous cancellation (that always happens
-         * e.g., for liblustre) sees that lock is released.
-         */
-        ols->ols_state = OLS_RELEASED;
-        return osc_lock_unhold(ols);
 }
 
 static void osc_lock_fini(const struct lu_env *env,
 }
 
 static void osc_lock_fini(const struct lu_env *env,
@@ -346,10 +355,8 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
 
         ENTRY;
 
 
         ENTRY;
 
-        if (!(olck->ols_flags & LDLM_FL_LVB_READY)) {
-                EXIT;
-                return;
-        }
+        if (!(olck->ols_flags & LDLM_FL_LVB_READY))
+                RETURN_EXIT;
 
         lvb   = &olck->ols_lvb;
         obj   = olck->ols_cl.cls_obj;
 
         lvb   = &olck->ols_lvb;
         obj   = olck->ols_cl.cls_obj;
@@ -528,6 +535,7 @@ static int osc_lock_upcall(void *cookie, int errcode)
                                 dlmlock->l_ast_data = NULL;
                                 olck->ols_handle.cookie = 0ULL;
                                 cfs_spin_unlock(&osc_ast_guard);
                                 dlmlock->l_ast_data = NULL;
                                 olck->ols_handle.cookie = 0ULL;
                                 cfs_spin_unlock(&osc_ast_guard);
+                                ldlm_lock_fail_match_locked(dlmlock, rc);
                                 unlock_res_and_lock(dlmlock);
                                 LDLM_LOCK_PUT(dlmlock);
                         }
                                 unlock_res_and_lock(dlmlock);
                                 LDLM_LOCK_PUT(dlmlock);
                         }
@@ -556,17 +564,22 @@ static int osc_lock_upcall(void *cookie, int errcode)
                         rc = 0;
                 }
 
                         rc = 0;
                 }
 
-                if (rc == 0)
-                        /* on error, lock was signaled by cl_lock_error() */
+                if (rc == 0) {
                         cl_lock_signal(env, lock);
                         cl_lock_signal(env, lock);
-                else
+                        /* del user for lock upcall cookie */
+                        cl_unuse_try(env, lock);
+                } else {
+                        /* del user for lock upcall cookie */
+                        cl_lock_user_del(env, lock);
                         cl_lock_error(env, lock, rc);
                         cl_lock_error(env, lock, rc);
+                }
 
                 cl_lock_mutex_put(env, lock);
 
                 /* release cookie reference, acquired by osc_lock_enqueue() */
                 lu_ref_del(&lock->cll_reference, "upcall", lock);
                 cl_lock_put(env, lock);
 
                 cl_lock_mutex_put(env, lock);
 
                 /* release cookie reference, acquired by osc_lock_enqueue() */
                 lu_ref_del(&lock->cll_reference, "upcall", lock);
                 cl_lock_put(env, lock);
+
                 cl_env_nested_put(&nest, env);
         } else
                 /* should never happen, similar to osc_ldlm_blocking_ast(). */
                 cl_env_nested_put(&nest, env);
         } else
                 /* should never happen, similar to osc_ldlm_blocking_ast(). */
@@ -1052,7 +1065,6 @@ static int osc_lock_enqueue_wait(const struct lu_env *env,
         ENTRY;
 
         LASSERT(cl_lock_is_mutexed(lock));
         ENTRY;
 
         LASSERT(cl_lock_is_mutexed(lock));
-        LASSERT(lock->cll_state == CLS_QUEUING);
 
         /* make it enqueue anyway for glimpse lock, because we actually
          * don't need to cancel any conflicting locks. */
 
         /* make it enqueue anyway for glimpse lock, because we actually
          * don't need to cancel any conflicting locks. */
@@ -1156,10 +1168,14 @@ static int osc_lock_enqueue(const struct lu_env *env,
         ENTRY;
 
         LASSERT(cl_lock_is_mutexed(lock));
         ENTRY;
 
         LASSERT(cl_lock_is_mutexed(lock));
-        LASSERT(lock->cll_state == CLS_QUEUING);
-        LASSERT(ols->ols_state == OLS_NEW);
+        LASSERTF(ols->ols_state == OLS_NEW,
+                 "Impossible state: %d\n", ols->ols_state);
 
         ols->ols_flags = osc_enq2ldlm_flags(enqflags);
 
         ols->ols_flags = osc_enq2ldlm_flags(enqflags);
+        if (enqflags & CEF_AGL) {
+                ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
+                ols->ols_agl = 1;
+        }
         if (ols->ols_flags & LDLM_FL_HAS_INTENT)
                 ols->ols_glimpse = 1;
         if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST))
         if (ols->ols_flags & LDLM_FL_HAS_INTENT)
                 ols->ols_glimpse = 1;
         if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST))
@@ -1181,6 +1197,8 @@ static int osc_lock_enqueue(const struct lu_env *env,
                         /* a reference for lock, passed as an upcall cookie */
                         cl_lock_get(lock);
                         lu_ref_add(&lock->cll_reference, "upcall", lock);
                         /* a reference for lock, passed as an upcall cookie */
                         cl_lock_get(lock);
                         lu_ref_add(&lock->cll_reference, "upcall", lock);
+                        /* a user for lock also */
+                        cl_lock_user_add(env, lock);
                         ols->ols_state = OLS_ENQUEUED;
 
                         /*
                         ols->ols_state = OLS_ENQUEUED;
 
                         /*
@@ -1196,11 +1214,16 @@ static int osc_lock_enqueue(const struct lu_env *env,
                                           obj->oo_oinfo->loi_kms_valid,
                                           osc_lock_upcall,
                                           ols, einfo, &ols->ols_handle,
                                           obj->oo_oinfo->loi_kms_valid,
                                           osc_lock_upcall,
                                           ols, einfo, &ols->ols_handle,
-                                          PTLRPCD_SET, 1);
+                                          PTLRPCD_SET, 1, ols->ols_agl);
                         if (result != 0) {
                         if (result != 0) {
+                                cl_lock_user_del(env, lock);
                                 lu_ref_del(&lock->cll_reference,
                                            "upcall", lock);
                                 cl_lock_put(env, lock);
                                 lu_ref_del(&lock->cll_reference,
                                            "upcall", lock);
                                 cl_lock_put(env, lock);
+                                if (unlikely(result == -ECANCELED)) {
+                                        ols->ols_state = OLS_NEW;
+                                        result = 0;
+                                }
                         }
                 } else {
                         ols->ols_state = OLS_GRANTED;
                         }
                 } else {
                         ols->ols_state = OLS_GRANTED;
@@ -1218,8 +1241,34 @@ static int osc_lock_wait(const struct lu_env *env,
         struct cl_lock  *lock = olck->ols_cl.cls_lock;
 
         LINVRNT(osc_lock_invariant(olck));
         struct cl_lock  *lock = olck->ols_cl.cls_lock;
 
         LINVRNT(osc_lock_invariant(olck));
-        if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED)
-                return 0;
+
+        if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
+                if (olck->ols_flags & LDLM_FL_LVB_READY) {
+                        return 0;
+                } else if (olck->ols_agl) {
+                        olck->ols_state = OLS_NEW;
+                } else {
+                        LASSERT(lock->cll_error);
+                        return lock->cll_error;
+                }
+        }
+
+        if (olck->ols_state == OLS_NEW) {
+                if (lock->cll_descr.cld_enq_flags & CEF_NO_REENQUEUE) {
+                        return -ENAVAIL;
+                } else {
+                        int rc;
+
+                        LASSERT(olck->ols_agl);
+
+                        rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC |
+                                                                CEF_MUST);
+                        if (rc != 0)
+                                return rc;
+                        else
+                                return CLO_REENQUEUED;
+                }
+        }
 
         LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
                      lock->cll_error == 0, olck->ols_lock != NULL));
 
         LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
                      lock->cll_error == 0, olck->ols_lock != NULL));
@@ -1337,6 +1386,7 @@ static void osc_lock_cancel(const struct lu_env *env,
                                       lock, result);
         }
         olck->ols_state = OLS_CANCELLED;
                                       lock, result);
         }
         olck->ols_state = OLS_CANCELLED;
+        olck->ols_flags &= ~LDLM_FL_LVB_READY;
         osc_lock_detach(env, olck);
 }
 
         osc_lock_detach(env, olck);
 }
 
@@ -1475,6 +1525,9 @@ static int osc_lock_fits_into(const struct lu_env *env,
                 return 0;
 
         if (need->cld_mode == CLM_PHANTOM) {
                 return 0;
 
         if (need->cld_mode == CLM_PHANTOM) {
+                if (ols->ols_agl)
+                        return !(ols->ols_state > OLS_RELEASED);
+
                 /*
                  * Note: the QUEUED lock can't be matched here, otherwise
                  * it might cause the deadlocks.
                 /*
                  * Note: the QUEUED lock can't be matched here, otherwise
                  * it might cause the deadlocks.
index f4f2c7a..1f25a97 100644 (file)
@@ -3178,7 +3178,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
 
 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
                             obd_enqueue_update_f upcall, void *cookie,
 
 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
                             obd_enqueue_update_f upcall, void *cookie,
-                            int *flags, int rc)
+                            int *flags, int agl, int rc)
 {
         int intent = *flags & LDLM_FL_HAS_INTENT;
         ENTRY;
 {
         int intent = *flags & LDLM_FL_HAS_INTENT;
         ENTRY;
@@ -3196,7 +3196,8 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
                 }
         }
 
                 }
         }
 
-        if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
+        if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
+            (rc == 0)) {
                 *flags |= LDLM_FL_LVB_READY;
                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
                 *flags |= LDLM_FL_LVB_READY;
                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
@@ -3214,6 +3215,9 @@ static int osc_enqueue_interpret(const struct lu_env *env,
         struct ldlm_lock *lock;
         struct lustre_handle handle;
         __u32 mode;
         struct ldlm_lock *lock;
         struct lustre_handle handle;
         __u32 mode;
+        struct ost_lvb *lvb;
+        __u32 lvb_len;
+        int *flags = aa->oa_flags;
 
         /* Make a local copy of a lock handle and a mode, because aa->oa_*
          * might be freed anytime after lock upcall has been called. */
 
         /* Make a local copy of a lock handle and a mode, because aa->oa_*
          * might be freed anytime after lock upcall has been called. */
@@ -3233,13 +3237,20 @@ static int osc_enqueue_interpret(const struct lu_env *env,
         /* Let CP AST to grant the lock first. */
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
 
         /* Let CP AST to grant the lock first. */
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
 
+        if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
+                lvb = NULL;
+                lvb_len = 0;
+        } else {
+                lvb = aa->oa_lvb;
+                lvb_len = sizeof(*aa->oa_lvb);
+        }
+
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
-                                   mode, aa->oa_flags, aa->oa_lvb,
-                                   sizeof(*aa->oa_lvb), &handle, rc);
+                                   mode, flags, lvb, lvb_len, &handle, rc);
         /* Complete osc stuff. */
         /* Complete osc stuff. */
-        rc = osc_enqueue_fini(req, aa->oa_lvb,
-                              aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+        rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
+                              flags, aa->oa_agl, rc);
 
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
 
 
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
 
@@ -3263,8 +3274,9 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp,
                         struct lov_oinfo *loi, int flags,
                         struct ost_lvb *lvb, __u32 mode, int rc)
 {
                         struct lov_oinfo *loi, int flags,
                         struct ost_lvb *lvb, __u32 mode, int rc)
 {
+        struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
+
         if (rc == ELDLM_OK) {
         if (rc == ELDLM_OK) {
-                struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
                 __u64 tmp;
 
                 LASSERT(lock != NULL);
                 __u64 tmp;
 
                 LASSERT(lock != NULL);
@@ -3285,13 +3297,21 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp,
                                    lock->l_policy_data.l_extent.end);
                 }
                 ldlm_lock_allow_match(lock);
                                    lock->l_policy_data.l_extent.end);
                 }
                 ldlm_lock_allow_match(lock);
-                LDLM_LOCK_PUT(lock);
         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
+                LASSERT(lock != NULL);
                 loi->loi_lvb = *lvb;
                 loi->loi_lvb = *lvb;
+                ldlm_lock_allow_match(lock);
                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
                 rc = ELDLM_OK;
         }
                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
                 rc = ELDLM_OK;
         }
+
+        if (lock != NULL) {
+                if (rc != ELDLM_OK)
+                        ldlm_lock_fail_match(lock, rc);
+
+                LDLM_LOCK_PUT(lock);
+        }
 }
 EXPORT_SYMBOL(osc_update_enqueue);
 
 }
 EXPORT_SYMBOL(osc_update_enqueue);
 
@@ -3310,11 +3330,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                      obd_enqueue_update_f upcall, void *cookie,
                      struct ldlm_enqueue_info *einfo,
                      struct lustre_handle *lockh,
                      obd_enqueue_update_f upcall, void *cookie,
                      struct ldlm_enqueue_info *einfo,
                      struct lustre_handle *lockh,
-                     struct ptlrpc_request_set *rqset, int async)
+                     struct ptlrpc_request_set *rqset, int async, int agl)
 {
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req = NULL;
         int intent = *flags & LDLM_FL_HAS_INTENT;
 {
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req = NULL;
         int intent = *flags & LDLM_FL_HAS_INTENT;
+        int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
         ldlm_mode_t mode;
         int rc;
         ENTRY;
         ldlm_mode_t mode;
         int rc;
         ENTRY;
@@ -3348,13 +3369,20 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
         mode = einfo->ei_mode;
         if (einfo->ei_mode == LCK_PR)
                 mode |= LCK_PW;
         mode = einfo->ei_mode;
         if (einfo->ei_mode == LCK_PR)
                 mode |= LCK_PW;
-        mode = ldlm_lock_match(obd->obd_namespace,
-                               *flags | LDLM_FL_LVB_READY, res_id,
+        mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
                                einfo->ei_type, policy, mode, lockh, 0);
         if (mode) {
                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
 
                                einfo->ei_type, policy, mode, lockh, 0);
         if (mode) {
                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
 
-                if (osc_set_lock_data_with_check(matched, einfo)) {
+                if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
+                        /* For AGL, if enqueue RPC is sent but the lock is not
+                         * granted, then skip to process this strpe.
+                         * Return -ECANCELED to tell the caller. */
+                        ldlm_lock_decref(lockh, mode);
+                        LDLM_LOCK_PUT(matched);
+                        RETURN(-ECANCELED);
+                } else if (osc_set_lock_data_with_check(matched, einfo)) {
+                        *flags |= LDLM_FL_LVB_READY;
                         /* addref the lock only if not async requests and PW
                          * lock is matched whereas we asked for PR. */
                         if (!rqset && einfo->ei_mode != mode)
                         /* addref the lock only if not async requests and PW
                          * lock is matched whereas we asked for PR. */
                         if (!rqset && einfo->ei_mode != mode)
@@ -3368,16 +3396,17 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                         /* We already have a lock, and it's referenced */
                         (*upcall)(cookie, ELDLM_OK);
 
                         /* We already have a lock, and it's referenced */
                         (*upcall)(cookie, ELDLM_OK);
 
-                        /* For async requests, decref the lock. */
                         if (einfo->ei_mode != mode)
                                 ldlm_lock_decref(lockh, LCK_PW);
                         else if (rqset)
                         if (einfo->ei_mode != mode)
                                 ldlm_lock_decref(lockh, LCK_PW);
                         else if (rqset)
+                                /* For async requests, decref the lock. */
                                 ldlm_lock_decref(lockh, einfo->ei_mode);
                         LDLM_LOCK_PUT(matched);
                         RETURN(ELDLM_OK);
                                 ldlm_lock_decref(lockh, einfo->ei_mode);
                         LDLM_LOCK_PUT(matched);
                         RETURN(ELDLM_OK);
-                } else
+                } else {
                         ldlm_lock_decref(lockh, mode);
                         ldlm_lock_decref(lockh, mode);
-                LDLM_LOCK_PUT(matched);
+                        LDLM_LOCK_PUT(matched);
+                }
         }
 
  no_match:
         }
 
  no_match:
@@ -3416,6 +3445,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                         aa->oa_cookie = cookie;
                         aa->oa_lvb    = lvb;
                         aa->oa_lockh  = lockh;
                         aa->oa_cookie = cookie;
                         aa->oa_lvb    = lvb;
                         aa->oa_lockh  = lockh;
+                        aa->oa_agl    = !!agl;
 
                         req->rq_interpret_reply =
                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
 
                         req->rq_interpret_reply =
                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
@@ -3429,7 +3459,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
-        rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
+        rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
         if (intent)
                 ptlrpc_req_finished(req);
 
         if (intent)
                 ptlrpc_req_finished(req);
 
@@ -3451,7 +3481,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
-                              rqset, rqset != NULL);
+                              rqset, rqset != NULL, 0);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index d426214..a1c47f5 100644 (file)
@@ -8387,6 +8387,50 @@ test_221() {
 }
 run_test 221 "make sure fault and truncate race to not cause OOM"
 
 }
 run_test 221 "make sure fault and truncate race to not cause OOM"
 
+test_222a () {
+       rm -rf $DIR/$tdir
+       mkdir -p $DIR/$tdir
+       $LFS setstripe -c 1 -i 0 $DIR/$tdir
+       createmany -o $DIR/$tdir/$tfile 10
+       cancel_lru_locks mdc
+       cancel_lru_locks osc
+       #define OBD_FAIL_LDLM_AGL_DELAY           0x31a
+       $LCTL set_param fail_loc=0x31a
+       ls -l $DIR/$tdir > /dev/null || error "AGL for ls failed"
+       $LCTL set_param fail_loc=0
+       rm -r $DIR/$tdir
+}
+run_test 222a "AGL for ls should not trigger CLIO lock failure ================"
+
+test_222b () {
+       rm -rf $DIR/$tdir
+       mkdir -p $DIR/$tdir
+       $LFS setstripe -c 1 -i 0 $DIR/$tdir
+       createmany -o $DIR/$tdir/$tfile 10
+       cancel_lru_locks mdc
+       cancel_lru_locks osc
+       #define OBD_FAIL_LDLM_AGL_DELAY           0x31a
+       $LCTL set_param fail_loc=0x31a
+       rm -r $DIR/$tdir || "AGL for rmdir failed"
+       $LCTL set_param fail_loc=0
+}
+run_test 222b "AGL for rmdir should not trigger CLIO lock failure ============="
+
+test_223 () {
+       rm -rf $DIR/$tdir
+       mkdir -p $DIR/$tdir
+       $LFS setstripe -c 1 -i 0 $DIR/$tdir
+       createmany -o $DIR/$tdir/$tfile 10
+       cancel_lru_locks mdc
+       cancel_lru_locks osc
+       #define OBD_FAIL_LDLM_AGL_NOLOCK          0x31b
+       $LCTL set_param fail_loc=0x31b
+       ls -l $DIR/$tdir > /dev/null || error "reenqueue failed"
+       $LCTL set_param fail_loc=0
+       rm -r $DIR/$tdir
+}
+run_test 223 "osc reenqueue if without AGL lock granted ======================="
+
 #
 # tests that do cleanup/setup should be run at the end
 #
 #
 # tests that do cleanup/setup should be run at the end
 #