* rpc right on enqueue, what will make it slower, vs.
* asynchronous rpc in blocking thread. */
count += ldlm_cancel_lru_local(ns, cancels, cancel,
- avail - count, flags);
+ avail - count, 0, flags);
size[DLM_LOCKREQ_OFF] =
ldlm_request_bufsize(count, LDLM_ENQUEUE);
}
int ldlm_cli_cancel(struct lustre_handle *lockh)
{
+ int avail, flags, count = 1, rc = 0;
+ struct ldlm_namespace *ns;
struct ldlm_lock *lock;
CFS_LIST_HEAD(cancels);
- int rc = 0;
ENTRY;
/* concurrent cancels on the same handle can happen */
}
rc = ldlm_cli_cancel_local(lock);
+ if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(rc < 0 ? rc : 0);
+ }
+ /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
+ * rpc which goes to canceld portal, so we can cancel other lru locks
+ * here and send them all as one LDLM_CANCEL rpc. */
+ LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, &cancels);
+ avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+ LASSERT(avail > 0);
- if (rc == LDLM_FL_BL_AST) {
- rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0);
- } else if (rc == LDLM_FL_CANCELING) {
- struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
- int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
- int flags, cancel;
- LASSERT(avail > 0);
-
- flags = ns_connect_lru_resize(ns) ?
- LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
- cancel = ns_connect_lru_resize(ns) ? 0 : 1;
-
- cancel += ldlm_cancel_lru_local(ns, &cancels, 0,
- avail - cancel, flags);
- ldlm_cli_cancel_list(&cancels, cancel, NULL, 0, 0);
- }
- if (rc != LDLM_FL_CANCELING)
- LDLM_LOCK_PUT(lock);
- RETURN(rc < 0 ? rc : 0);
+ ns = lock->l_resource->lr_namespace;
+ flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
+ count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - count,
+ LDLM_FL_BL_AST, flags);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+ RETURN(0);
}
/* XXX until we will have compound requests and can cut cancels from generic rpc
* we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
-static int ldlm_cancel_list(struct list_head *cancels, int count)
+static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
{
CFS_LIST_HEAD(head);
struct ldlm_lock *lock, *next;
if (left-- == 0)
break;
- rc = ldlm_cli_cancel_local(lock);
- if (rc == LDLM_FL_BL_AST) {
+ if (flags & LDLM_FL_LOCAL_ONLY) {
+ rc = LDLM_FL_LOCAL_ONLY;
+ ldlm_lock_cancel(lock);
+ } else {
+ rc = ldlm_cli_cancel_local(lock);
+ }
+ if (!(flags & LDLM_FL_BL_AST) && (rc == LDLM_FL_BL_AST)) {
LDLM_DEBUG(lock, "Cancel lock separately");
list_del_init(&lock->l_bl_ast);
list_add(&lock->l_bl_ast, &head);
}
}
- if(bl_ast > 0) {
+ if (bl_ast > 0) {
count -= bl_ast;
ldlm_cli_cancel_list(&head, bl_ast, NULL, 0, 0);
}
RETURN(count);
}
-
-/* cancel lock list without sending rpc to server*/
-static int ldlm_cancel_list_local(struct list_head *cancels, int count)
-{
- struct ldlm_lock *lock, *next;
- int left = 0;
-
- left = count;
- list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
- if (left-- == 0)
- break;
- ldlm_lock_cancel(lock);
- /* CANCEL RPC should not be sent to server. */
- list_del_init(&lock->l_bl_ast);
- LDLM_LOCK_PUT(lock);
- count--;
- }
- RETURN(count);
-}
-
/* Return 1 if @lock should be canceled according to shrinker policy.
* Return zero otherwise. */
static int ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
* memory pressre policy function.
*/
int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
- int count, int max, int flags)
+ int count, int max, int cancel_flags, int flags)
{
ldlm_cancel_lru_policy_t cancel_lru_policy_func;
int added = 0, unused, cancel;
LASSERT(!lock->l_readers && !lock->l_writers);
/* If we have chosen to cancel this lock voluntarily, we
- * better send cancel notification to server, so that it
+ * better send cancel notification to server, so that it
* frees appropriate state. This might lead to a race
* where while we are doing cancel here, server is also
* silently cancelling this lock. */
lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
- /* Setting the CBPENDING flag is a little misleading, but
- * prevents an important race; namely, once CBPENDING is
- * set, the lock can accumulate no more readers/writers.
- * Since readers and writers are already zero here,
- * ldlm_lock_decref() won't see this flag and call
- * l_blocking_ast */
+ /* Setting the CBPENDING flag is a little misleading,
+ * but prevents an important race; namely, once
+ * CBPENDING is set, the lock can accumulate no more
+ * readers/writers. Since readers and writers are
+ * already zero here, ldlm_lock_decref() won't see
+ * this flag and call l_blocking_ast */
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
- /* We can't re-add to l_lru as it confuses the refcounting
- * in ldlm_lock_remove_from_lru() if an AST arrives after
- * we drop ns_lock below. We use l_bl_ast and can't use
- * l_pending_chain as it is used both on server and client
- * nevertheless bug 5666 says it is used only on server */
+ /* We can't re-add to l_lru as it confuses the
+ * refcounting in ldlm_lock_remove_from_lru() if an AST
+ * arrives after we drop ns_lock below. We use l_bl_ast
+ * and can't use l_pending_chain as it is used both on
+ * server and client nevertheless bug 5666 says it is
+ * used only on server */
LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, cancels);
unlock_res_and_lock(lock);
if (cancels == NULL)
RETURN(added);
- RETURN(ldlm_cancel_list(cancels, added));
+ RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
}
/* when called with LDLM_ASYNC the blocking callback will be handled
#ifndef __KERNEL__
sync = LDLM_SYNC; /* force to be sync in user space */
#endif
- count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, flags);
+ count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags);
if (sync == LDLM_ASYNC) {
rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
if (rc == 0)
struct list_head *cancels,
ldlm_policy_data_t *policy,
ldlm_mode_t mode, int lock_flags,
- int flags, void *opaque)
+ int cancel_flags, void *opaque)
{
struct ldlm_lock *lock;
int count = 0;
}
if (lock->l_readers || lock->l_writers) {
- if (flags & LDLM_FL_WARN) {
+ if (cancel_flags & LDLM_FL_WARN) {
LDLM_ERROR(lock, "lock in use");
//LBUG();
}
}
unlock_res(res);
- if ((flags & LDLM_FL_LOCAL_ONLY))
- RETURN(ldlm_cancel_list_local(cancels, count));
-
- RETURN(ldlm_cancel_list(cancels, count));
+ RETURN(ldlm_cancel_list(cancels, count, cancel_flags));
}
/* If @req is NULL, send CANCEL request to server with handles of locks