Whamcloud - gitweb
LU-56 ldlm: SMP improvement for ldlm_lock_cancel
authorLiang Zhen <liang@whamcloud.com>
Tue, 19 Jun 2012 14:43:31 +0000 (22:43 +0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 29 Jun 2012 02:52:37 +0000 (22:52 -0400)
ldlm_del_waiting_lock() is always called twice in ldlm_lock_cancel
even the ldlm_lock is not on expiring waiter list, this is bad for
performance because it needs to grab a global lock and disable
softirq.
This patch add a new bit l_waited to ldlm_lock, it's set only if
ldlm_add_waiting_lock() is ever called, and we can bypass
ldlm_del_waiting_lock() in ldlm_lock_cancel() if w/o this bit.

I also adopted patch(32562) on BZ 20509 because it's necessary for
this improvement.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: Ie48d527b8da187a88646aa070be3aa4beb304b1d
Reviewed-on: http://review.whamcloud.com/3141
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/l_lock.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c

index a2b9b9d..2722cc0 100644 (file)
@@ -733,6 +733,22 @@ struct ldlm_lock {
          * Protected by lock and resource locks.
          */
                               l_destroyed:1,
+       /*
+        * it's set in lock_res_and_lock() and unset in unlock_res_and_lock().
+        *
+        * NB: compare with check_res_locked(), check this bit is cheaper,
+        * also, spin_is_locked() is deprecated for kernel code, one reason is
+        * because it works only for SMP so user needs add extra macros like
+        * LASSERT_SPIN_LOCKED for uniprocessor kernels.
+        */
+                             l_res_locked:1,
+       /*
+        * it's set once we call ldlm_add_waiting_lock_res_locked()
+        * to start the lock-timeout timer and it will never be reset.
+        *
+        * Protected by lock_res_and_lock().
+        */
+                             l_waited:1,
         /**
          * flag whether this is a server namespace lock.
          */
index 2c0865f..d1de7d1 100644 (file)
  */
 struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock)
 {
-        /* on server-side resource of lock doesn't change */
-        if (!lock->l_ns_srv)
-                cfs_spin_lock(&lock->l_lock);
+       /* on server-side resource of lock doesn't change */
+       if (!lock->l_ns_srv)
+               cfs_spin_lock(&lock->l_lock);
 
-        lock_res(lock->l_resource);
-        return lock->l_resource;
+       lock_res(lock->l_resource);
+
+       lock->l_res_locked = 1;
+       return lock->l_resource;
 }
 
 void unlock_res_and_lock(struct ldlm_lock *lock)
 {
-        /* on server-side resource of lock doesn't change */
-        unlock_res(lock->l_resource);
-        if (!lock->l_ns_srv)
-                cfs_spin_unlock(&lock->l_lock);
+       /* on server-side resource of lock doesn't change */
+       lock->l_res_locked = 0;
+
+       unlock_res(lock->l_resource);
+       if (!lock->l_ns_srv)
+               cfs_spin_unlock(&lock->l_lock);
 }
index cb4c3cb..bb17c8a 100644 (file)
@@ -1782,14 +1782,17 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
                 LBUG();
         }
 
-        ldlm_del_waiting_lock(lock);
+       if (lock->l_waited)
+               ldlm_del_waiting_lock(lock);
 
         /* Releases cancel callback. */
         ldlm_cancel_callback(lock);
 
         /* Yes, second time, just in case it was added again while we were
            running with no res lock in ldlm_cancel_callback */
-        ldlm_del_waiting_lock(lock);
+       if (lock->l_waited)
+               ldlm_del_waiting_lock(lock);
+
         ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy_nolock(lock);
 
index a605621..b4198b4 100644 (file)
@@ -84,20 +84,6 @@ static inline unsigned int ldlm_get_rq_timeout(void)
         return timeout < 1 ? 1 : timeout;
 }
 
-#ifdef __KERNEL__
-/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
-static cfs_spinlock_t waiting_locks_spinlock;   /* BH lock (timer) */
-static cfs_list_t waiting_locks_list;
-static cfs_timer_t waiting_locks_timer;
-
-static struct expired_lock_thread {
-        cfs_waitq_t               elt_waitq;
-        int                       elt_state;
-        int                       elt_dump;
-        cfs_list_t                elt_expired_locks;
-} expired_lock_thread;
-#endif
-
 #define ELT_STOPPED   0
 #define ELT_READY     1
 #define ELT_TERMINATE 2
@@ -138,7 +124,19 @@ struct ldlm_bl_work_item {
         int                     blwi_mem_pressure;
 };
 
-#ifdef __KERNEL__
+#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__)
+
+/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
+static cfs_spinlock_t waiting_locks_spinlock;   /* BH lock (timer) */
+static cfs_list_t waiting_locks_list;
+static cfs_timer_t waiting_locks_timer;
+
+static struct expired_lock_thread {
+       cfs_waitq_t             elt_waitq;
+       int                     elt_state;
+       int                     elt_dump;
+       cfs_list_t              elt_expired_locks;
+} expired_lock_thread;
 
 static inline int have_expired_locks(void)
 {
@@ -213,6 +211,13 @@ static int expired_lock_main(void *arg)
                                 LDLM_LOCK_RELEASE(lock);
                                 continue;
                         }
+
+                       if (lock->l_destroyed) {
+                               /* release the lock refcount where
+                                * waiting_locks_callback() founds */
+                               LDLM_LOCK_RELEASE(lock);
+                               continue;
+                       }
                         export = class_export_lock_get(lock->l_export, lock);
                         cfs_spin_unlock_bh(&waiting_locks_spinlock);
 
@@ -243,6 +248,7 @@ static int expired_lock_main(void *arg)
 }
 
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds);
 
 /**
  * Check if there is a request in the export request list
@@ -273,9 +279,9 @@ static int ldlm_lock_busy(struct ldlm_lock *lock)
 /* This is called from within a timer interrupt and cannot schedule */
 static void waiting_locks_callback(unsigned long unused)
 {
-        struct ldlm_lock *lock;
+       struct ldlm_lock        *lock;
+       int                     need_dump = 0;
 
-repeat:
         cfs_spin_lock_bh(&waiting_locks_spinlock);
         while (!cfs_list_empty(&waiting_locks_list)) {
                 lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
@@ -297,9 +303,16 @@ repeat:
                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
 
                         cfs_list_del_init(&lock->l_pending_chain);
-                        cfs_spin_unlock_bh(&waiting_locks_spinlock);
-                        ldlm_add_waiting_lock(lock);
-                        goto repeat;
+                       if (lock->l_destroyed) {
+                               /* relay the lock refcount decrease to
+                                * expired lock thread */
+                               cfs_list_add(&lock->l_pending_chain,
+                                       &expired_lock_thread.elt_expired_locks);
+                       } else {
+                               __ldlm_add_waiting_lock(lock,
+                                               ldlm_get_enq_timeout(lock));
+                       }
+                       continue;
                 }
 
                 /* if timeout overlaps the activation time of suspended timeouts
@@ -313,9 +326,16 @@ repeat:
                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
 
                         cfs_list_del_init(&lock->l_pending_chain);
-                        cfs_spin_unlock_bh(&waiting_locks_spinlock);
-                        ldlm_add_waiting_lock(lock);
-                        goto repeat;
+                       if (lock->l_destroyed) {
+                               /* relay the lock refcount decrease to
+                                * expired lock thread */
+                               cfs_list_add(&lock->l_pending_chain,
+                                       &expired_lock_thread.elt_expired_locks);
+                       } else {
+                               __ldlm_add_waiting_lock(lock,
+                                               ldlm_get_enq_timeout(lock));
+                       }
+                       continue;
                 }
 
                 /* Check if we need to prolong timeout */
@@ -355,14 +375,15 @@ repeat:
                 cfs_list_del(&lock->l_pending_chain);
                 cfs_list_add(&lock->l_pending_chain,
                              &expired_lock_thread.elt_expired_locks);
-        }
+               need_dump = 1;
+       }
 
-        if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) {
-                if (obd_dump_on_timeout)
-                        expired_lock_thread.elt_dump = __LINE__;
+       if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) {
+               if (obd_dump_on_timeout && need_dump)
+                       expired_lock_thread.elt_dump = __LINE__;
 
-                cfs_waitq_signal(&expired_lock_thread.elt_waitq);
-        }
+               cfs_waitq_signal(&expired_lock_thread.elt_waitq);
+       }
 
         /*
          * Make sure the timer will fire again if we have any locks
@@ -420,10 +441,14 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
 
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
-        int ret;
-        int timeout = ldlm_get_enq_timeout(lock);
+       int ret;
+       int timeout = ldlm_get_enq_timeout(lock);
 
-        LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+       /* NB: must be called with hold of lock_res_and_lock() */
+       LASSERT(lock->l_res_locked);
+       lock->l_waited = 1;
+
+       LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
 
         cfs_spin_lock_bh(&waiting_locks_spinlock);
         if (lock->l_destroyed) {
@@ -553,7 +578,8 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
         LDLM_DEBUG(lock, "refreshed");
         return 1;
 }
-#else /* !__KERNEL__ */
+
+#else /* !HAVE_SERVER_SUPPORT ||  !__KERNEL__ */
 
 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 {
@@ -564,16 +590,19 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
 {
         RETURN(0);
 }
-#endif /* __KERNEL__ */
 
-#ifdef HAVE_SERVER_SUPPORT
-# ifndef __KERNEL__
+# ifdef HAVE_SERVER_SUPPORT
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
-        LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
-        RETURN(1);
+       LASSERT(lock->l_res_locked);
+       LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+       RETURN(1);
 }
+
 # endif
+#endif /* HAVE_SERVER_SUPPORT && __KERNEL__ */
+
+#ifdef HAVE_SERVER_SUPPORT
 
 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
                             const char *ast_type)
@@ -772,22 +801,23 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         req->rq_interpret_reply = ldlm_cb_interpret;
         req->rq_no_resend = 1;
 
-        lock_res(lock->l_resource);
-        if (lock->l_granted_mode != lock->l_req_mode) {
-                /* this blocking AST will be communicated as part of the
-                 * completion AST instead */
-                unlock_res(lock->l_resource);
-                ptlrpc_req_finished(req);
-                LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
-                RETURN(0);
-        }
+       lock_res_and_lock(lock);
+       if (lock->l_granted_mode != lock->l_req_mode) {
+               /* this blocking AST will be communicated as part of the
+                * completion AST instead */
+               unlock_res_and_lock(lock);
 
-        if (lock->l_destroyed) {
-                /* What's the point? */
-                unlock_res(lock->l_resource);
-                ptlrpc_req_finished(req);
-                RETURN(0);
-        }
+               ptlrpc_req_finished(req);
+               LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
+               RETURN(0);
+       }
+
+       if (lock->l_destroyed) {
+               /* What's the point? */
+               unlock_res_and_lock(lock);
+               ptlrpc_req_finished(req);
+               RETURN(0);
+       }
 
         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
                 instant_cancel = 1;
@@ -800,14 +830,14 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         LDLM_DEBUG(lock, "server preparing blocking AST");
 
         ptlrpc_request_set_replen(req);
-        if (instant_cancel) {
-                unlock_res(lock->l_resource);
-                ldlm_lock_cancel(lock);
-        } else {
-                LASSERT(lock->l_granted_mode == lock->l_req_mode);
-                ldlm_add_waiting_lock(lock);
-                unlock_res(lock->l_resource);
-        }
+       if (instant_cancel) {
+               unlock_res_and_lock(lock);
+               ldlm_lock_cancel(lock);
+       } else {
+               LASSERT(lock->l_granted_mode == lock->l_req_mode);
+               ldlm_add_waiting_lock(lock);
+               unlock_res_and_lock(lock);
+       }
 
         req->rq_send_state = LUSTRE_IMP_FULL;
         /* ptlrpc_request_alloc_pack already set timeout */
@@ -2644,6 +2674,7 @@ static int ldlm_setup(void)
                        GOTO(out, rc);
         }
 
+# ifdef HAVE_SERVER_SUPPORT
         CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
         expired_lock_thread.elt_state = ELT_STOPPED;
         cfs_waitq_init(&expired_lock_thread.elt_waitq);
@@ -2660,6 +2691,7 @@ static int ldlm_setup(void)
 
         cfs_wait_event(expired_lock_thread.elt_waitq,
                        expired_lock_thread.elt_state == ELT_READY);
+# endif /* HAVE_SERVER_SUPPORT */
 
         rc = ldlm_pools_init();
         if (rc)
@@ -2713,15 +2745,18 @@ static int ldlm_cleanup(void)
        if (ldlm_state->ldlm_cancel_service != NULL)
                ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
 # endif
+
 #ifdef __KERNEL__
        ldlm_proc_cleanup();
 
+# ifdef HAVE_SERVER_SUPPORT
        if (expired_lock_thread.elt_state != ELT_STOPPED) {
                expired_lock_thread.elt_state = ELT_TERMINATE;
                cfs_waitq_signal(&expired_lock_thread.elt_waitq);
                cfs_wait_event(expired_lock_thread.elt_waitq,
                               expired_lock_thread.elt_state == ELT_STOPPED);
        }
+# endif
 #endif /* __KERNEL__ */
 
         OBD_FREE(ldlm_state, sizeof(*ldlm_state));