Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 2b34473..760fd58 100644 (file)
@@ -528,7 +528,7 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
                  * rpc right on enqueue, what will make it slower, vs. 
                  * asynchronous rpc in blocking thread. */
                 count += ldlm_cancel_lru_local(ns, cancels, cancel,
                  * rpc right on enqueue, what will make it slower, vs. 
                  * asynchronous rpc in blocking thread. */
                 count += ldlm_cancel_lru_local(ns, cancels, cancel,
-                                               avail - count, flags);
+                                               avail - count, 0, flags);
                 size[DLM_LOCKREQ_OFF] =
                         ldlm_request_bufsize(count, LDLM_ENQUEUE);
         }
                 size[DLM_LOCKREQ_OFF] =
                         ldlm_request_bufsize(count, LDLM_ENQUEUE);
         }
@@ -1013,9 +1013,10 @@ EXPORT_SYMBOL(ldlm_cli_update_pool);
 
 int ldlm_cli_cancel(struct lustre_handle *lockh)
 {
 
 int ldlm_cli_cancel(struct lustre_handle *lockh)
 {
+        int avail, flags, count = 1, rc = 0;
+        struct ldlm_namespace *ns;
         struct ldlm_lock *lock;
         CFS_LIST_HEAD(cancels);
         struct ldlm_lock *lock;
         CFS_LIST_HEAD(cancels);
-        int rc = 0;
         ENTRY;
 
         /* concurrent cancels on the same handle can happen */
         ENTRY;
 
         /* concurrent cancels on the same handle can happen */
@@ -1026,32 +1027,29 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         }
 
         rc = ldlm_cli_cancel_local(lock);
         }
 
         rc = ldlm_cli_cancel_local(lock);
+        if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) {
+                LDLM_LOCK_PUT(lock);
+                RETURN(rc < 0 ? rc : 0);
+        }
+        /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
+         * rpc which goes to canceld portal, so we can cancel other lru locks
+         * here and send them all as one LDLM_CANCEL rpc. */
+        LASSERT(list_empty(&lock->l_bl_ast));
         list_add(&lock->l_bl_ast, &cancels);
         list_add(&lock->l_bl_ast, &cancels);
+        avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+        LASSERT(avail > 0);
 
 
-        if (rc == LDLM_FL_BL_AST) {
-                rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0);
-        } else if (rc == LDLM_FL_CANCELING) {
-                struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
-                int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
-                int flags, cancel;
-                LASSERT(avail > 0);
-                
-                flags = ns_connect_lru_resize(ns) ? 
-                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
-                cancel = ns_connect_lru_resize(ns) ? 0 : 1;
-                
-                cancel += ldlm_cancel_lru_local(ns, &cancels, 0, 
-                                                avail - cancel, flags);
-                ldlm_cli_cancel_list(&cancels, cancel, NULL, 0, 0);
-        }
-        if (rc != LDLM_FL_CANCELING)
-                LDLM_LOCK_PUT(lock);
-        RETURN(rc < 0 ? rc : 0);
+        ns = lock->l_resource->lr_namespace;
+        flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
+        count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - count,
+                                       LDLM_FL_BL_AST, flags);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        RETURN(0);
 }
 
 /* XXX until we will have compound requests and can cut cancels from generic rpc
  * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
 }
 
 /* XXX until we will have compound requests and can cut cancels from generic rpc
  * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
-static int ldlm_cancel_list(struct list_head *cancels, int count)
+static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
 {
         CFS_LIST_HEAD(head);
         struct ldlm_lock *lock, *next;
 {
         CFS_LIST_HEAD(head);
         struct ldlm_lock *lock, *next;
@@ -1062,8 +1060,13 @@ static int ldlm_cancel_list(struct list_head *cancels, int count)
                 if (left-- == 0)
                         break;
 
                 if (left-- == 0)
                         break;
 
-                rc = ldlm_cli_cancel_local(lock);
-                if (rc == LDLM_FL_BL_AST) {
+                if (flags & LDLM_FL_LOCAL_ONLY) {
+                        rc = LDLM_FL_LOCAL_ONLY;
+                        ldlm_lock_cancel(lock);
+                } else {
+                        rc = ldlm_cli_cancel_local(lock);
+                }
+                if (!(flags & LDLM_FL_BL_AST) && (rc == LDLM_FL_BL_AST)) {
                         LDLM_DEBUG(lock, "Cancel lock separately");
                         list_del_init(&lock->l_bl_ast);
                         list_add(&lock->l_bl_ast, &head);
                         LDLM_DEBUG(lock, "Cancel lock separately");
                         list_del_init(&lock->l_bl_ast);
                         list_add(&lock->l_bl_ast, &head);
@@ -1078,7 +1081,7 @@ static int ldlm_cancel_list(struct list_head *cancels, int count)
                 }
 
         }
                 }
 
         }
-        if(bl_ast > 0) {
+        if (bl_ast > 0) {
                 count -= bl_ast;
                 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0, 0);
         }
                 count -= bl_ast;
                 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0, 0);
         }
@@ -1086,26 +1089,6 @@ static int ldlm_cancel_list(struct list_head *cancels, int count)
         RETURN(count);
 }
 
         RETURN(count);
 }
 
-
-/* cancel lock list without sending rpc to server*/
-static int ldlm_cancel_list_local(struct list_head *cancels, int count)
-{
-        struct ldlm_lock *lock, *next;
-        int left = 0;
-
-        left = count;
-        list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
-                if (left-- == 0)
-                        break;
-                ldlm_lock_cancel(lock);
-                /* CANCEL RPC should not be sent to server. */
-                list_del_init(&lock->l_bl_ast);
-                LDLM_LOCK_PUT(lock);
-                count--;
-        }
-        RETURN(count);
-}
-
 /* Return 1 if @lock should be canceled according to shrinker policy. 
  * Return zero otherwise. */
 static int ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
 /* Return 1 if @lock should be canceled according to shrinker policy. 
  * Return zero otherwise. */
 static int ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
@@ -1248,7 +1231,7 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
  *                              memory pressre policy function.
  */
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
  *                              memory pressre policy function.
  */
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
-                          int count, int max, int flags)
+                          int count, int max, int cancel_flags, int flags)
 {
         ldlm_cancel_lru_policy_t cancel_lru_policy_func;
         int added = 0, unused, cancel;
 {
         ldlm_cancel_lru_policy_t cancel_lru_policy_func;
         int added = 0, unused, cancel;
@@ -1309,25 +1292,26 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                         LASSERT(!lock->l_readers && !lock->l_writers);
 
                         /* If we have chosen to cancel this lock voluntarily, we
                         LASSERT(!lock->l_readers && !lock->l_writers);
 
                         /* If we have chosen to cancel this lock voluntarily, we
-                         * better send cancel notification to server, so that it 
+                         * better send cancel notification to server, so that it
                          * frees appropriate state. This might lead to a race 
                          * where while we are doing cancel here, server is also 
                          * silently cancelling this lock. */
                         lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
 
                          * frees appropriate state. This might lead to a race 
                          * where while we are doing cancel here, server is also 
                          * silently cancelling this lock. */
                         lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
 
-                        /* Setting the CBPENDING flag is a little misleading, but
-                         * prevents an important race; namely, once CBPENDING is
-                         * set, the lock can accumulate no more readers/writers.
-                         * Since readers and writers are already zero here, 
-                         * ldlm_lock_decref() won't see this flag and call 
-                         * l_blocking_ast */
+                        /* Setting the CBPENDING flag is a little misleading,
+                         * but prevents an important race; namely, once
+                         * CBPENDING is set, the lock can accumulate no more
+                         * readers/writers. Since readers and writers are
+                         * already zero here, ldlm_lock_decref() won't see
+                         * this flag and call l_blocking_ast */
                         lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
 
                         lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
 
-                        /* We can't re-add to l_lru as it confuses the refcounting
-                         * in ldlm_lock_remove_from_lru() if an AST arrives after 
-                         * we drop ns_lock below. We use l_bl_ast and can't use 
-                         * l_pending_chain as it is used both on server and client
-                         * nevertheless bug 5666 says it is used only on server */
+                        /* We can't re-add to l_lru as it confuses the
+                         * refcounting in ldlm_lock_remove_from_lru() if an AST
+                         * arrives after we drop ns_lock below. We use l_bl_ast
+                         * and can't use l_pending_chain as it is used both on
+                         * server and client nevertheless bug 5666 says it is
+                         * used only on server */
                         LASSERT(list_empty(&lock->l_bl_ast));
                         list_add(&lock->l_bl_ast, cancels);
                         unlock_res_and_lock(lock);
                         LASSERT(list_empty(&lock->l_bl_ast));
                         list_add(&lock->l_bl_ast, cancels);
                         unlock_res_and_lock(lock);
@@ -1341,7 +1325,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
         if (cancels == NULL)
                 RETURN(added);
 
         if (cancels == NULL)
                 RETURN(added);
 
-        RETURN(ldlm_cancel_list(cancels, added));
+        RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
 }
 
 /* when called with LDLM_ASYNC the blocking callback will be handled
 }
 
 /* when called with LDLM_ASYNC the blocking callback will be handled
@@ -1358,7 +1342,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
 #ifndef __KERNEL__
         sync = LDLM_SYNC; /* force to be sync in user space */
 #endif
 #ifndef __KERNEL__
         sync = LDLM_SYNC; /* force to be sync in user space */
 #endif
-        count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, flags);
+        count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags);
         if (sync == LDLM_ASYNC) {
                 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
                 if (rc == 0)
         if (sync == LDLM_ASYNC) {
                 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
                 if (rc == 0)
@@ -1378,7 +1362,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                                struct list_head *cancels,
                                ldlm_policy_data_t *policy,
                                ldlm_mode_t mode, int lock_flags,
                                struct list_head *cancels,
                                ldlm_policy_data_t *policy,
                                ldlm_mode_t mode, int lock_flags,
-                               int flags, void *opaque)
+                               int cancel_flags, void *opaque)
 {
         struct ldlm_lock *lock;
         int count = 0;
 {
         struct ldlm_lock *lock;
         int count = 0;
@@ -1394,7 +1378,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                 }
 
                 if (lock->l_readers || lock->l_writers) {
                 }
 
                 if (lock->l_readers || lock->l_writers) {
-                        if (flags & LDLM_FL_WARN) {
+                        if (cancel_flags & LDLM_FL_WARN) {
                                 LDLM_ERROR(lock, "lock in use");
                                 //LBUG();
                         }
                                 LDLM_ERROR(lock, "lock in use");
                                 //LBUG();
                         }
@@ -1428,10 +1412,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
         }
         unlock_res(res);
 
         }
         unlock_res(res);
 
-       if ((flags & LDLM_FL_LOCAL_ONLY))
-               RETURN(ldlm_cancel_list_local(cancels, count));
-
-        RETURN(ldlm_cancel_list(cancels, count));
+        RETURN(ldlm_cancel_list(cancels, count, cancel_flags));
 }
 
 /* If @req is NULL, send CANCEL request to server with handles of locks 
 }
 
 /* If @req is NULL, send CANCEL request to server with handles of locks