Whamcloud - gitweb
b=11880
authorhuangwei <huangwei>
Mon, 2 Jul 2007 09:59:34 +0000 (09:59 +0000)
committerhuangwei <huangwei>
Mon, 2 Jul 2007 09:59:34 +0000 (09:59 +0000)
r=alex,oleg

landing patch from 11880.

lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c

index 80f87e0..4dba55b 100644 (file)
@@ -88,7 +88,10 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
         if (flags == LDLM_FL_WAIT_NOREPROC) {
                 /* client side - set a flag to prevent sending a CANCEL */
                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
-                ldlm_lock_decref_internal(lock, mode);
+
+                /* when reaching here, it is under lock_res_and_lock(). Thus, 
+                   need call the nolock version of ldlm_lock_decref_internal*/
+                ldlm_lock_decref_internal_nolock(lock, mode);
         }
 
         ldlm_lock_destroy_nolock(lock);
@@ -137,6 +140,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
         int local = ns->ns_client;
         int added = (mode == LCK_NL);
         int overlaps = 0;
+        int splitted = 0;
         ENTRY;
 
         CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
@@ -155,6 +159,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 req->l_blocking_ast = ldlm_flock_blocking_ast;
         }
 
+reprocess:
         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
                 /* This loop determines where this processes locks start
                  * in the resource lr_granted list. */
@@ -334,15 +339,22 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 /* XXX - if ldlm_lock_new() can sleep we should
                  * release the ns_lock, allocate the new lock,
                  * and restart processing this lock. */
-                new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK,
+                if (!new2) {
+                        unlock_res_and_lock(req);
+                         new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK,
                                         lock->l_granted_mode, NULL, NULL, NULL,
                                         NULL, 0);
-                if (!new2) {
-                        ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
-                        *err = -ENOLCK;
-                        RETURN(LDLM_ITER_STOP);
+                        lock_res_and_lock(req);
+                        if (!new2) {
+                                ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
+                                *err = -ENOLCK;
+                                RETURN(LDLM_ITER_STOP);
+                        }
+                        goto reprocess;
                 }
 
+                splitted = 1;
+
                 new2->l_granted_mode = lock->l_granted_mode;
                 new2->l_policy_data.l_flock.pid =
                         new->l_policy_data.l_flock.pid;
@@ -360,8 +372,9 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                                  &new2->l_export->exp_ldlm_data.led_held_locks);
                         spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);
                 }
-                if (*flags == LDLM_FL_WAIT_NOREPROC)
-                        ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+                if (*flags == LDLM_FL_WAIT_NOREPROC) {
+                        ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
+                }
 
                 /* insert new2 at lock */
                 ldlm_resource_add_lock(res, ownlocks, new2);
@@ -369,6 +382,10 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 break;
         }
 
+        /* if new2 is created but never used, destroy it*/
+        if (splitted == 0 && new2 != NULL)
+                ldlm_lock_destroy_nolock(new2);
+
         /* At this point we're granting the lock request. */
         req->l_granted_mode = req->l_req_mode;
 
@@ -398,9 +415,9 @@ restart:
                                 ldlm_reprocess_queue(res, &res->lr_waiting,
                                                      &rpc_list);
 
-                                unlock_res(res);
-                                rc = ldlm_run_bl_ast_work(&rpc_list);
-                                lock_res(res);
+                                unlock_res_and_lock(req);
+                                rc = ldlm_run_cp_ast_work(&rpc_list);
+                                lock_res_and_lock(req);
                                 if (rc == -ERESTART)
                                         GOTO(restart, -ERESTART);
                        }
@@ -500,7 +517,7 @@ granted:
 
         LDLM_DEBUG(lock, "client-side enqueue granted");
         ns = lock->l_resource->lr_namespace;
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
 
         /* take lock off the deadlock detection waitq. */
         list_del_init(&lock->l_flock_waitq);
@@ -535,7 +552,7 @@ granted:
                 if (flags == 0)
                         cfs_waitq_signal(&lock->l_waitq);
         }
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         RETURN(0);
 }
 EXPORT_SYMBOL(ldlm_flock_completion_ast);
index 1b9ac7d..a11a85a 100644 (file)
@@ -30,7 +30,9 @@ ldlm_lock_create(struct ldlm_namespace *ns, struct ldlm_res_id,
 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
                                void *cookie, int *flags);
 void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
+void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, __u32 mode);
 void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode);
+void ldlm_lock_decref_internal_nolock(struct ldlm_lock *, __u32 mode);
 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
                                 struct list_head *work_list);
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
index 12995ee..e95d2c3 100644 (file)
@@ -545,15 +545,12 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
         unlock_res_and_lock(lock);
 }
 
-void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
+/* only called in ldlm_flock_destroy and for local locks.
+ * for LDLM_FLOCK type locks, l_blocking_ast is null, and
+ * ldlm_lock_remove_from_lru() does nothing, it is safe 
+ * for ldlm_flock_destroy usage by dropping some code */
+void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
 {
-        struct ldlm_namespace *ns;
-        ENTRY;
-
-        lock_res_and_lock(lock);
-
-        ns = lock->l_resource->lr_namespace;
-
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
                 LASSERT(lock->l_readers > 0);
@@ -564,6 +561,19 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                 lock->l_writers--;
         }
 
+        LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
+}
+void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
+{
+        struct ldlm_namespace *ns;
+        ENTRY;
+
+        lock_res_and_lock(lock);
+
+        ns = lock->l_resource->lr_namespace;
+
+        ldlm_lock_decref_internal_nolock(lock, mode);
+
         if (lock->l_flags & LDLM_FL_LOCAL &&
             !lock->l_readers && !lock->l_writers) {
                 /* If this is a local lock on a server namespace and this was
@@ -609,8 +619,6 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                 unlock_res_and_lock(lock);
         }
 
-        LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
-
         EXIT;
 }