Whamcloud - gitweb
add support for build HEAD without vfs_intent patches.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 9db42be..b42d6a5 100644 (file)
@@ -28,7 +28,9 @@
 
 #ifdef __KERNEL__
 # include <libcfs/libcfs.h>
+# ifndef HAVE_VFS_INTENT_PATCHES
 # include <linux/lustre_intent.h>
+# endif
 #else
 # include <liblustre.h>
 # include <libcfs/kp30.h>
@@ -171,10 +173,11 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
 {
         int rc = 0;
         if (!list_empty(&lock->l_lru)) {
+                struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
                 list_del_init(&lock->l_lru);
-                lock->l_resource->lr_namespace->ns_nr_unused--;
-                LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
+                ns->ns_nr_unused--;
+                LASSERT(ns->ns_nr_unused >= 0);
                 rc = 1;
         }
         return rc;
@@ -182,15 +185,49 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
 
 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
 {
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
         int rc;
         ENTRY;
-        spin_lock(&lock->l_resource->lr_namespace->ns_unused_lock);
+        spin_lock(&ns->ns_unused_lock);
         rc = ldlm_lock_remove_from_lru_nolock(lock);
-        spin_unlock(&lock->l_resource->lr_namespace->ns_unused_lock);
+        spin_unlock(&ns->ns_unused_lock);
         EXIT;
         return rc;
 }
 
+void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
+{
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+        lock->l_last_used = cfs_time_current();
+        LASSERT(list_empty(&lock->l_lru));
+        list_add_tail(&lock->l_lru, &ns->ns_unused_list);
+        LASSERT(ns->ns_nr_unused >= 0);
+        ns->ns_nr_unused++;
+}
+
+void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
+{
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+        ENTRY;
+        spin_lock(&ns->ns_unused_lock);
+        ldlm_lock_add_to_lru_nolock(lock);
+        spin_unlock(&ns->ns_unused_lock);
+        EXIT;
+}
+
+void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
+{
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+        ENTRY;
+        spin_lock(&ns->ns_unused_lock);
+        if (!list_empty(&lock->l_lru)) {
+                ldlm_lock_remove_from_lru_nolock(lock);
+                ldlm_lock_add_to_lru_nolock(lock);
+        }
+        spin_unlock(&ns->ns_unused_lock);
+        EXIT;
+}
+
 /* This used to have a 'strict' flag, which recovery would use to mark an
  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
  * shall explain why it's gone: with the new hash table scheme, once you call
@@ -323,7 +360,7 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
         int type;
         ENTRY;
 
-        LASSERT(ns->ns_client != 0);
+        LASSERT(ns_is_client(ns));
 
         lock_res_and_lock(lock);
         if (memcmp(new_resid, &lock->l_resource->lr_name,
@@ -531,7 +568,6 @@ void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
                 lock->l_readers++;
         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP))
                 lock->l_writers++;
-        lock->l_last_used = cfs_time_current();
         LDLM_LOCK_GET(lock);
         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
 }
@@ -575,7 +611,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
             (lock->l_flags & LDLM_FL_CBPENDING)) {
                 /* If we received a blocked AST and this was the last reference,
                  * run the callback. */
-                if (ns->ns_client == LDLM_NAMESPACE_SERVER && lock->l_export)
+                if (ns_is_server(ns) && lock->l_export)
                         CERROR("FL_CBPENDING set on non-local lock--just a "
                                "warning\n");
 
@@ -587,23 +623,19 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
                         ldlm_handle_bl_callback(ns, NULL, lock);
-        } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
+        } else if (ns_is_client(ns) &&
                    !lock->l_readers && !lock->l_writers &&
                    !(lock->l_flags & LDLM_FL_NO_LRU)) {
                 /* If this is a client-side namespace and this was the last
                  * reference, put it on the LRU. */
-                LASSERT(list_empty(&lock->l_lru));
-                LASSERT(ns->ns_nr_unused >= 0);
-                lock->l_last_used = cfs_time_current();
-                spin_lock(&ns->ns_unused_lock);
-                list_add_tail(&lock->l_lru, &ns->ns_unused_list);
-                ns->ns_nr_unused++;
-                spin_unlock(&ns->ns_unused_lock);
+                ldlm_lock_add_to_lru(lock);
                 unlock_res_and_lock(lock);
-                /* Call ldlm_cancel_lru() only if EARLY_CANCEL is not supported
-                 * by the server, otherwise, it is done on enqueue. */
-                if (!exp_connect_cancelset(lock->l_conn_export))
-                        ldlm_cancel_lru(ns, LDLM_ASYNC);
+                /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE 
+                 * are not supported by the server, otherwise, it is done on 
+                 * enqueue. */
+                if (!exp_connect_cancelset(lock->l_conn_export) && 
+                    !ns_connect_lru_resize(ns))
+                        ldlm_cancel_lru(ns, 0, LDLM_ASYNC);
         } else {
                 unlock_res_and_lock(lock);
         }
@@ -856,12 +888,14 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
         if (work_list && lock->l_completion_ast != NULL)
                 ldlm_add_ast_work_item(lock, NULL, work_list);
 
+        ldlm_pool_add(&res->lr_namespace->ns_pool, lock);
         EXIT;
 }
 
 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
  * comment above ldlm_lock_match */
-static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
+static struct ldlm_lock *search_queue(struct list_head *queue,
+                                      ldlm_mode_t *mode,
                                       ldlm_policy_data_t *policy,
                                       struct ldlm_lock *old_lock, int flags)
 {
@@ -869,6 +903,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
         struct list_head *tmp;
 
         list_for_each(tmp, queue) {
+                ldlm_mode_t match;
+
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (lock == old_lock)
@@ -887,8 +923,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     lock->l_readers == 0 && lock->l_writers == 0)
                         continue;
 
-                if (!(lock->l_req_mode & mode))
+                if (!(lock->l_req_mode & *mode))
                         continue;
+                match = lock->l_req_mode;
 
                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
                     (lock->l_policy_data.l_extent.start >
@@ -896,7 +933,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
                         continue;
 
-                if (unlikely(mode == LCK_GROUP) &&
+                if (unlikely(match == LCK_GROUP) &&
                     lock->l_resource->lr_type == LDLM_EXTENT &&
                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
                         continue;
@@ -916,10 +953,13 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     !(lock->l_flags & LDLM_FL_LOCAL))
                         continue;
 
-                if (flags & LDLM_FL_TEST_LOCK)
+                if (flags & LDLM_FL_TEST_LOCK) {
                         LDLM_LOCK_GET(lock);
-                else
-                        ldlm_lock_addref_internal_nolock(lock, mode);
+                        ldlm_lock_touch_in_lru(lock);
+                } else {
+                        ldlm_lock_addref_internal_nolock(lock, match);
+                }
+                *mode = match;
                 return lock;
         }
 
@@ -958,10 +998,10 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock)
  * caller code unchanged), the context failure will be discovered by caller
  * sometime later.
  */
-int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
-                    const struct ldlm_res_id *res_id, ldlm_type_t type,
-                    ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                    struct lustre_handle *lockh)
+ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
+                            const struct ldlm_res_id *res_id, ldlm_type_t type,
+                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                            struct lustre_handle *lockh)
 {
         struct ldlm_resource *res;
         struct ldlm_lock *lock, *old_lock = NULL;
@@ -986,15 +1026,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 
         lock_res(res);
 
-        lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_granted, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
         if (flags & LDLM_FL_BLOCK_GRANTED)
                 GOTO(out, rc = 0);
-        lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_converting, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
-        lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_waiting, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
 
@@ -1062,7 +1102,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
         if (old_lock)
                 LDLM_LOCK_PUT(old_lock);
 
-        return rc;
+        return rc ? mode : 0;
 }
 
 /* Returns a referenced lock */
@@ -1114,7 +1154,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
 {
         struct ldlm_lock *lock = *lockp;
         struct ldlm_resource *res = lock->l_resource;
-        int local = res->lr_namespace->ns_client;
+        int local = ns_is_client(res->lr_namespace);
         ldlm_processing_policy policy;
         ldlm_error_t rc = ELDLM_OK;
         ENTRY;
@@ -1230,13 +1270,38 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
         RETURN(rc);
 }
 
+/* Helper function for pair ldlm_run_{bl,cp}_ast_work().
+ * 
+ * Send an existing rpc set specified by @arg->set and then
+ * destroy it. Create new one if @do_create flag is set. */
+static void
+ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
+{
+        int rc;
+
+        rc = ptlrpc_set_wait(arg->set);
+        if (arg->type == LDLM_BL_CALLBACK)
+                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
+        ptlrpc_set_destroy(arg->set);
+
+        if (do_create)
+                arg->set = ptlrpc_prep_set();
+}
+
 int ldlm_run_bl_ast_work(struct list_head *rpc_list)
 {
+        struct ldlm_cb_set_arg arg;
         struct list_head *tmp, *pos;
         struct ldlm_lock_desc d;
-        int rc = 0, retval = 0;
+        int ast_count;
+        int rc = 0;
         ENTRY;
 
+        arg.set = ptlrpc_prep_set();
+        atomic_set(&arg.restart, 0);
+        arg.type = LDLM_BL_CALLBACK;
+
+        ast_count = 0;
         list_for_each_safe(tmp, pos, rpc_list) {
                 struct ldlm_lock *lock =
                         list_entry(tmp, struct ldlm_lock, l_bl_ast);
@@ -1255,24 +1320,44 @@ int ldlm_run_bl_ast_work(struct list_head *rpc_list)
 
                 LDLM_LOCK_PUT(lock->l_blocking_lock);
                 lock->l_blocking_lock = NULL;
-                rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING);
-
-                if (rc == -ERESTART)
-                        retval = rc;
-                else if (rc)
-                        CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
-                               "disconnect client\n");
+                rc = lock->l_blocking_ast(lock, &d, (void *)&arg, 
+                                          LDLM_CB_BLOCKING);
                 LDLM_LOCK_PUT(lock);
+                ast_count++;
+
+                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
+                 * and create a new set for requests that remained in
+                 * @rpc_list */
+                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
+                        ldlm_send_and_maybe_create_set(&arg, 1);
+                        ast_count = 0;
+                }
         }
-        RETURN(retval);
+
+        if (ast_count > 0)
+                ldlm_send_and_maybe_create_set(&arg, 0);
+        else
+                /* In case when number of ASTs is multiply of
+                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
+                 * @arg.set must be destroyed here, otherwise we get 
+                 * write memory leaking. */
+                ptlrpc_set_destroy(arg.set);
+
+        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
 }
 
 int ldlm_run_cp_ast_work(struct list_head *rpc_list)
 {
+        struct ldlm_cb_set_arg arg;
         struct list_head *tmp, *pos;
-        int rc = 0, retval = 0;
+        int ast_count;
+        int rc = 0;
         ENTRY;
 
+        arg.set = ptlrpc_prep_set();
+        atomic_set(&arg.restart, 0);
+        arg.type = LDLM_CP_CALLBACK;
+
         /* It's possible to receive a completion AST before we've set
          * the l_completion_ast pointer: either because the AST arrived
          * before the reply, or simply because there's a small race
@@ -1284,6 +1369,7 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list)
          * reader/writer reference, which we won't do until we get the
          * reply and finish enqueueing. */
 
+        ast_count = 0;
         list_for_each_safe(tmp, pos, rpc_list) {
                 struct ldlm_lock *lock =
                         list_entry(tmp, struct ldlm_lock, l_cp_ast);
@@ -1295,16 +1381,31 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list)
                 lock->l_flags &= ~LDLM_FL_CP_REQD;
                 unlock_res_and_lock(lock);
 
-                if (lock->l_completion_ast != NULL)
-                        rc = lock->l_completion_ast(lock, 0, 0);
-                if (rc == -ERESTART)
-                        retval = rc;
-                else if (rc)
-                        CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
-                               "disconnect client\n");
+                if (lock->l_completion_ast != NULL) {
+                        rc = lock->l_completion_ast(lock, 0, (void *)&arg);
+                        ast_count++;
+                }
                 LDLM_LOCK_PUT(lock);
+
+                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
+                 * and create a new set for requests that remained in
+                 * @rpc_list */
+                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
+                        ldlm_send_and_maybe_create_set(&arg, 1);
+                        ast_count = 0;
+                }
         }
-        RETURN(retval);
+
+        if (ast_count > 0)
+                ldlm_send_and_maybe_create_set(&arg, 0);
+        else
+                /* In case when number of ASTs is multiply of
+                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
+                 * @arg.set must be destroyed here, otherwise we get 
+                 * write memory leaking. */
+                ptlrpc_set_destroy(arg.set);
+
+        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
@@ -1354,7 +1455,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
         ENTRY;
 
         /* Local lock trees don't get reprocessed. */
-        if (res->lr_namespace->ns_client) {
+        if (ns_is_client(res->lr_namespace)) {
                 EXIT;
                 return;
         }
@@ -1473,6 +1574,13 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         ldlm_del_waiting_lock(lock); 
         ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy_nolock(lock);
+
+        if (lock->l_granted_mode == lock->l_req_mode)
+                ldlm_pool_del(&ns->ns_pool, lock);
+
+        /* Make sure we will not be called again for same lock what is possible
+         * if not to zero out lock->l_granted_mode */
+        lock->l_granted_mode = 0;
         unlock_res_and_lock(lock);
 
         EXIT;
@@ -1574,7 +1682,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
         ldlm_resource_unlink_lock(lock);
 
         /* If this is a local resource, put it on the appropriate list. */
-        if (res->lr_namespace->ns_client) {
+        if (ns_is_client(res->lr_namespace)) {
                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
                 } else {