Whamcloud - gitweb
b=13872
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 2f1450b..e83d396 100644 (file)
@@ -171,10 +171,11 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
 {
         int rc = 0;
         if (!list_empty(&lock->l_lru)) {
+                struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
                 list_del_init(&lock->l_lru);
-                lock->l_resource->lr_namespace->ns_nr_unused--;
-                LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
+                ns->ns_nr_unused--;
+                LASSERT(ns->ns_nr_unused >= 0);
                 rc = 1;
         }
         return rc;
@@ -182,15 +183,49 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
 
 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
 {
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
         int rc;
         ENTRY;
-        spin_lock(&lock->l_resource->lr_namespace->ns_unused_lock);
+        spin_lock(&ns->ns_unused_lock);
         rc = ldlm_lock_remove_from_lru_nolock(lock);
-        spin_unlock(&lock->l_resource->lr_namespace->ns_unused_lock);
+        spin_unlock(&ns->ns_unused_lock);
         EXIT;
         return rc;
 }
 
+void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
+{
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+        lock->l_last_used = cfs_time_current();
+        LASSERT(list_empty(&lock->l_lru));
+        list_add_tail(&lock->l_lru, &ns->ns_unused_list);
+        LASSERT(ns->ns_nr_unused >= 0);
+        ns->ns_nr_unused++;
+}
+
+void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
+{
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+        ENTRY;
+        spin_lock(&ns->ns_unused_lock);
+        ldlm_lock_add_to_lru_nolock(lock);
+        spin_unlock(&ns->ns_unused_lock);
+        EXIT;
+}
+
+void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
+{
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+        ENTRY;
+        spin_lock(&ns->ns_unused_lock);
+        if (!list_empty(&lock->l_lru)) {
+                ldlm_lock_remove_from_lru_nolock(lock);
+                ldlm_lock_add_to_lru_nolock(lock);
+        }
+        spin_unlock(&ns->ns_unused_lock);
+        EXIT;
+}
+
 /* This used to have a 'strict' flag, which recovery would use to mark an
  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
  * shall explain why it's gone: with the new hash table scheme, once you call
@@ -291,6 +326,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         if (lock == NULL)
                 RETURN(NULL);
 
+        spin_lock_init(&lock->l_lock);
         lock->l_resource = ldlm_resource_getref(resource);
 
         atomic_set(&lock->l_refc, 2);
@@ -302,7 +338,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
         cfs_waitq_init(&lock->l_waitq);
         lock->l_blocking_lock = NULL;
-        lock->l_pidb = 0;
         lock->l_sl_mode.prev = NULL;
         lock->l_sl_mode.next = NULL;
         lock->l_sl_policy.prev = NULL;
@@ -323,7 +358,7 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
         int type;
         ENTRY;
 
-        LASSERT(ns->ns_client != 0);
+        LASSERT(ns_is_client(ns));
 
         lock_res_and_lock(lock);
         if (memcmp(new_resid, &lock->l_resource->lr_name,
@@ -352,9 +387,8 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
                        sizeof(lock->l_resource->lr_name)) != 0);
         lock_res(newres);
         lock->l_resource = newres;
-        unlock_res(newres);
         unlock_res(oldres);
-        unlock_bitlock(lock);
+        unlock_res_and_lock(lock);
 
         /* ...and the flowers are still standing! */
         ldlm_resource_putref(oldres);
@@ -532,7 +566,6 @@ void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
                 lock->l_readers++;
         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP))
                 lock->l_writers++;
-        lock->l_last_used = cfs_time_current();
         LDLM_LOCK_GET(lock);
         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
 }
@@ -576,7 +609,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
             (lock->l_flags & LDLM_FL_CBPENDING)) {
                 /* If we received a blocked AST and this was the last reference,
                  * run the callback. */
-                if (ns->ns_client == LDLM_NAMESPACE_SERVER && lock->l_export)
+                if (ns_is_server(ns) && lock->l_export)
                         CERROR("FL_CBPENDING set on non-local lock--just a "
                                "warning\n");
 
@@ -586,25 +619,21 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                 ldlm_lock_remove_from_lru(lock);
                 unlock_res_and_lock(lock);
                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
-                                ldlm_bl_to_thread(ns, NULL, lock, 0) != 0)
+                    ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
                         ldlm_handle_bl_callback(ns, NULL, lock);
-        } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
+        } else if (ns_is_client(ns) &&
                    !lock->l_readers && !lock->l_writers &&
                    !(lock->l_flags & LDLM_FL_NO_LRU)) {
                 /* If this is a client-side namespace and this was the last
                  * reference, put it on the LRU. */
-                LASSERT(list_empty(&lock->l_lru));
-                LASSERT(ns->ns_nr_unused >= 0);
-                lock->l_last_used = cfs_time_current();
-                spin_lock(&ns->ns_unused_lock);
-                list_add_tail(&lock->l_lru, &ns->ns_unused_list);
-                ns->ns_nr_unused++;
-                spin_unlock(&ns->ns_unused_lock);
+                ldlm_lock_add_to_lru(lock);
                 unlock_res_and_lock(lock);
-                /* Call ldlm_cancel_lru() only if EARLY_CANCEL is not supported
-                 * by the server, otherwise, it is done on enqueue. */
-                if (!exp_connect_cancelset(lock->l_conn_export))
-                        ldlm_cancel_lru(ns, LDLM_ASYNC);
+                /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE 
+                 * are not supported by the server, otherwise, it is done on 
+                 * enqueue. */
+                if (!exp_connect_cancelset(lock->l_conn_export) && 
+                    !exp_connect_lru_resize(lock->l_conn_export))
+                        ldlm_cancel_lru(ns, 0, LDLM_ASYNC);
         } else {
                 unlock_res_and_lock(lock);
         }
@@ -857,12 +886,14 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
         if (work_list && lock->l_completion_ast != NULL)
                 ldlm_add_ast_work_item(lock, NULL, work_list);
 
+        ldlm_pool_add(&res->lr_namespace->ns_pool, lock);
         EXIT;
 }
 
 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
  * comment above ldlm_lock_match */
-static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
+static struct ldlm_lock *search_queue(struct list_head *queue,
+                                      ldlm_mode_t *mode,
                                       ldlm_policy_data_t *policy,
                                       struct ldlm_lock *old_lock, int flags)
 {
@@ -870,6 +901,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
         struct list_head *tmp;
 
         list_for_each(tmp, queue) {
+                ldlm_mode_t match;
+
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (lock == old_lock)
@@ -888,8 +921,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     lock->l_readers == 0 && lock->l_writers == 0)
                         continue;
 
-                if (!(lock->l_req_mode & mode))
+                if (!(lock->l_req_mode & *mode))
                         continue;
+                match = lock->l_req_mode;
 
                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
                     (lock->l_policy_data.l_extent.start >
@@ -897,7 +931,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
                         continue;
 
-                if (unlikely(mode == LCK_GROUP) &&
+                if (unlikely(match == LCK_GROUP) &&
                     lock->l_resource->lr_type == LDLM_EXTENT &&
                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
                         continue;
@@ -917,10 +951,13 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     !(lock->l_flags & LDLM_FL_LOCAL))
                         continue;
 
-                if (flags & LDLM_FL_TEST_LOCK)
+                if (flags & LDLM_FL_TEST_LOCK) {
                         LDLM_LOCK_GET(lock);
-                else
-                        ldlm_lock_addref_internal_nolock(lock, mode);
+                        ldlm_lock_touch_in_lru(lock);
+                } else {
+                        ldlm_lock_addref_internal_nolock(lock, match);
+                }
+                *mode = match;
                 return lock;
         }
 
@@ -959,10 +996,10 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock)
  * caller code unchanged), the context failure will be discovered by caller
  * sometime later.
  */
-int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
-                    const struct ldlm_res_id *res_id, ldlm_type_t type,
-                    ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                    struct lustre_handle *lockh)
+ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
+                            const struct ldlm_res_id *res_id, ldlm_type_t type,
+                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                            struct lustre_handle *lockh)
 {
         struct ldlm_resource *res;
         struct ldlm_lock *lock, *old_lock = NULL;
@@ -987,15 +1024,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 
         lock_res(res);
 
-        lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_granted, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
         if (flags & LDLM_FL_BLOCK_GRANTED)
                 GOTO(out, rc = 0);
-        lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_converting, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
-        lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_waiting, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
 
@@ -1063,7 +1100,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
         if (old_lock)
                 LDLM_LOCK_PUT(old_lock);
 
-        return rc;
+        return rc ? mode : 0;
 }
 
 /* Returns a referenced lock */
@@ -1115,7 +1152,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
 {
         struct ldlm_lock *lock = *lockp;
         struct ldlm_resource *res = lock->l_resource;
-        int local = res->lr_namespace->ns_client;
+        int local = ns_is_client(res->lr_namespace);
         ldlm_processing_policy policy;
         ldlm_error_t rc = ELDLM_OK;
         ENTRY;
@@ -1355,7 +1392,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
         ENTRY;
 
         /* Local lock trees don't get reprocessed. */
-        if (res->lr_namespace->ns_client) {
+        if (ns_is_client(res->lr_namespace)) {
                 EXIT;
                 return;
         }
@@ -1390,6 +1427,7 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
                         LDLM_DEBUG(lock, "no blocking ast");
                 }
         }
+        lock->l_flags |= LDLM_FL_BL_DONE;
 }
 
 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
@@ -1473,6 +1511,13 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         ldlm_del_waiting_lock(lock); 
         ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy_nolock(lock);
+
+        if (lock->l_granted_mode == lock->l_req_mode)
+                ldlm_pool_del(&ns->ns_pool, lock);
+
+        /* Make sure we will not be called again for same lock what is possible
+         * if not to zero out lock->l_granted_mode */
+        lock->l_granted_mode = 0;
         unlock_res_and_lock(lock);
 
         EXIT;
@@ -1505,6 +1550,8 @@ void ldlm_cancel_locks_for_export(struct obd_export *exp)
                 spin_unlock(&exp->exp_ldlm_data.led_lock);
 
                 LDLM_DEBUG(lock, "export %p", exp);
+                ldlm_res_lvbo_update(res, NULL, 0, 1);
+
                 ldlm_lock_cancel(lock);
                 ldlm_reprocess_all(res);
 
@@ -1572,7 +1619,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
         ldlm_resource_unlink_lock(lock);
 
         /* If this is a local resource, put it on the appropriate list. */
-        if (res->lr_namespace->ns_client) {
+        if (ns_is_client(res->lr_namespace)) {
                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
                 } else {