Whamcloud - gitweb
b=23289 cleanup unnecessary spinlock dance in ldlm
authorJian Yu <jian.yu@oracle.com>
Tue, 2 Nov 2010 14:05:34 +0000 (22:05 +0800)
committerVitaly Fertman <vitaly.fertman@sun.com>
Thu, 4 Nov 2010 18:30:31 +0000 (21:30 +0300)
o=liang
i=andreas.dilger
i=oleg.drokin

lustre/include/lustre_dlm.h
lustre/ldlm/l_lock.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c

index 4b3521d..ce63a5b 100644 (file)
@@ -662,7 +662,10 @@ struct ldlm_lock {
          * Protected by lock and resource locks.
          */
         __u8                  l_destroyed;
          * Protected by lock and resource locks.
          */
         __u8                  l_destroyed;
-
+        /**
+         * flag whether this is a server namespace lock
+         */
+        __u8                  l_ns_srv;
         /**
          * If the lock is granted, a process sleeps on this waitq to learn when
          * it's no longer in use.  If the lock is not granted, a process sleeps
         /**
          * If the lock is granted, a process sleeps on this waitq to learn when
          * it's no longer in use.  If the lock is not granted, a process sleeps
index 0d21d24..8a77cfe 100644 (file)
  */
 struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock)
 {
  */
 struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock)
 {
-        struct ldlm_resource *res = NULL;
+        /* on server-side resource of lock doesn't change */
+        if (!lock->l_ns_srv)
+                cfs_spin_lock(&lock->l_lock);
 
 
-        cfs_spin_lock(&lock->l_lock);
-        res = lock->l_resource;
-
-        if (ns_is_server(ldlm_res_to_ns(res)))
-                /* on server-side resource of lock doesn't change */
-                cfs_spin_unlock(&lock->l_lock);
-
-        lock_res(res);
-        return res;
+        lock_res(lock->l_resource);
+        return lock->l_resource;
 }
 
 void unlock_res_and_lock(struct ldlm_lock *lock)
 {
 }
 
 void unlock_res_and_lock(struct ldlm_lock *lock)
 {
-        struct ldlm_resource *res = lock->l_resource;
-
-        if (ns_is_server(ldlm_res_to_ns(res))) {
-                /* on server-side resource of lock doesn't change */
-                unlock_res(res);
-                return;
-        }
-
-        unlock_res(res);
-        cfs_spin_unlock(&lock->l_lock);
+        /* on server-side resource of lock doesn't change */
+        unlock_res(lock->l_resource);
+        if (!lock->l_ns_srv)
+                cfs_spin_unlock(&lock->l_lock);
 }
 }
index e3b255e..4779bde 100644 (file)
@@ -203,6 +203,11 @@ int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
         int rc;
 
         ENTRY;
         int rc;
 
         ENTRY;
+        if (lock->l_ns_srv) {
+                LASSERT(cfs_list_empty(&lock->l_lru));
+                RETURN(0);
+        }
+
         cfs_spin_lock(&ns->ns_unused_lock);
         rc = ldlm_lock_remove_from_lru_nolock(lock);
         cfs_spin_unlock(&ns->ns_unused_lock);
         cfs_spin_lock(&ns->ns_unused_lock);
         rc = ldlm_lock_remove_from_lru_nolock(lock);
         cfs_spin_unlock(&ns->ns_unused_lock);
@@ -238,6 +243,12 @@ void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 
         ENTRY;
         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 
         ENTRY;
+        if (lock->l_ns_srv) {
+                LASSERT(cfs_list_empty(&lock->l_lru));
+                EXIT;
+                return;
+        }
+
         cfs_spin_lock(&ns->ns_unused_lock);
         if (!cfs_list_empty(&lock->l_lru)) {
                 ldlm_lock_remove_from_lru_nolock(lock);
         cfs_spin_lock(&ns->ns_unused_lock);
         if (!cfs_list_empty(&lock->l_lru)) {
                 ldlm_lock_remove_from_lru_nolock(lock);
@@ -334,8 +345,7 @@ static void lock_handle_addref(void *lock)
 
 /*
  * usage: pass in a resource on which you have done ldlm_resource_get
 
 /*
  * usage: pass in a resource on which you have done ldlm_resource_get
- *        pass in a parent lock on which you have done a ldlm_lock_get
- *        after return, ldlm_*_put the resource and parent
+ *        new lock will take over the refcount.
  * returns: lock with refcount 2 - one for current caller and one for remote
  */
 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
  * returns: lock with refcount 2 - one for current caller and one for remote
  */
 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
@@ -351,7 +361,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
                 RETURN(NULL);
 
         cfs_spin_lock_init(&lock->l_lock);
                 RETURN(NULL);
 
         cfs_spin_lock_init(&lock->l_lock);
-        lock->l_resource = ldlm_resource_getref(resource);
+        lock->l_resource = resource;
         lu_ref_add(&resource->lr_reference, "lock", lock);
 
         cfs_atomic_set(&lock->l_refc, 2);
         lu_ref_add(&resource->lr_reference, "lock", lock);
 
         cfs_atomic_set(&lock->l_refc, 2);
@@ -695,7 +705,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
             (lock->l_flags & LDLM_FL_CBPENDING)) {
                 /* If we received a blocked AST and this was the last reference,
                  * run the callback. */
             (lock->l_flags & LDLM_FL_CBPENDING)) {
                 /* If we received a blocked AST and this was the last reference,
                  * run the callback. */
-                if (ns_is_server(ns) && lock->l_export)
+                if (lock->l_ns_srv && lock->l_export)
                         CERROR("FL_CBPENDING set on non-local lock--just a "
                                "warning\n");
 
                         CERROR("FL_CBPENDING set on non-local lock--just a "
                                "warning\n");
 
@@ -1176,7 +1186,6 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                 RETURN(NULL);
 
         lock = ldlm_lock_new(res);
                 RETURN(NULL);
 
         lock = ldlm_lock_new(res);
-        ldlm_resource_putref(res);
 
         if (lock == NULL)
                 RETURN(NULL);
 
         if (lock == NULL)
                 RETURN(NULL);
@@ -1184,6 +1193,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
         lock->l_req_mode = mode;
         lock->l_ast_data = data;
         lock->l_pid = cfs_curproc_pid();
         lock->l_req_mode = mode;
         lock->l_ast_data = data;
         lock->l_pid = cfs_curproc_pid();
+        lock->l_ns_srv = ns_is_server(ns);
         if (cbs) {
                 lock->l_blocking_ast = cbs->lcs_blocking;
                 lock->l_completion_ast = cbs->lcs_completion;
         if (cbs) {
                 lock->l_blocking_ast = cbs->lcs_blocking;
                 lock->l_completion_ast = cbs->lcs_completion;
index 19e0c86..1f07482 100644 (file)
@@ -827,11 +827,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (req == NULL)
                 RETURN(-ENOMEM);
 
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        lock_res_and_lock(lock);
-        if (lock->l_resource->lr_lvb_len)
+        /* server namespace, doesn't need lock */
+        if (lock->l_resource->lr_lvb_len) {
                  req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
                                       lock->l_resource->lr_lvb_len);
                  req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
                                       lock->l_resource->lr_lvb_len);
-        unlock_res_and_lock(lock);
+        }
 
         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
         if (rc) {
 
         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
         if (rc) {
@@ -938,10 +938,9 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         body->lock_handle[0] = lock->l_remote_handle;
         ldlm_lock2desc(lock, &body->lock_desc);
 
         body->lock_handle[0] = lock->l_remote_handle;
         ldlm_lock2desc(lock, &body->lock_desc);
 
-        lock_res_and_lock(lock);
+        /* server namespace, doesn't need lock */
         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                              lock->l_resource->lr_lvb_len);
         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                              lock->l_resource->lr_lvb_len);
-        unlock_res_and_lock(lock);
         res = lock->l_resource;
         ptlrpc_request_set_replen(req);
 
         res = lock->l_resource;
         ptlrpc_request_set_replen(req);
 
index 50bdf14..0f30f16 100644 (file)
@@ -403,13 +403,15 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         if (unlikely(!lock))
                 GOTO(out_nolock, err = -ENOMEM);
 
         if (unlikely(!lock))
                 GOTO(out_nolock, err = -ENOMEM);
 
-        ldlm_lock_addref_internal(lock, mode);
         ldlm_lock2handle(lock, lockh);
         ldlm_lock2handle(lock, lockh);
-        lock_res_and_lock(lock);
+
+        /* NB: we don't have any lock now (lock_res_and_lock)
+         * because it's a new lock */
+        ldlm_lock_addref_internal_nolock(lock, mode);
         lock->l_flags |= LDLM_FL_LOCAL;
         if (*flags & LDLM_FL_ATOMIC_CB)
                 lock->l_flags |= LDLM_FL_ATOMIC_CB;
         lock->l_flags |= LDLM_FL_LOCAL;
         if (*flags & LDLM_FL_ATOMIC_CB)
                 lock->l_flags |= LDLM_FL_ATOMIC_CB;
-        unlock_res_and_lock(lock);
+
         if (policy != NULL)
                 lock->l_policy_data = *policy;
         if (client_cookie != NULL)
         if (policy != NULL)
                 lock->l_policy_data = *policy;
         if (client_cookie != NULL)