LCF_ASYNC = 0x1, /* Cancel locks asynchronously. */
LCF_LOCAL = 0x2, /* Cancel locks locally, not notifing server */
LCF_BL_AST = 0x4, /* Cancel LDLM_FL_BL_AST locks in the same RPC */
- LCF_ONE_LOCK = 0x8, /* Cancel locks pack only one lock. */
};
struct ldlm_flock {
union ldlm_policy_data *policy,
enum ldlm_mode mode,
enum ldlm_cancel_flags flags, void *opaque);
-int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
- int count, enum ldlm_cancel_flags flags);
+int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
+ struct list_head *head, int count,
+ enum ldlm_cancel_flags flags);
int ldlm_cancel_resource_local(struct ldlm_resource *res,
struct list_head *cancels,
union ldlm_policy_data *policy,
* Prepare and send a batched cancel RPC. It will include \a count lock
* handles of locks given in \a cancels list.
*/
-int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
- int count, enum ldlm_cancel_flags flags)
+int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
+ struct list_head *head, int count,
+ enum ldlm_cancel_flags flags)
{
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
LASSERT(exp != NULL);
LASSERT(count > 0);
+ LASSERT(head == NULL || lock == NULL);
CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
ptlrpc_at_set_req_timeout(req);
- if (flags & LCF_ONE_LOCK)
- rc = _ldlm_cancel_pack(req, ptr, NULL, count);
- else
- rc = _ldlm_cancel_pack(req, NULL, ptr, count);
+ rc = _ldlm_cancel_pack(req, lock, head, count);
if (rc == 0) {
ptlrpc_req_finished(req);
sent = count;
enum ldlm_cancel_flags flags)
{
struct obd_export *exp;
- int avail, count = 1, bl_ast = 0;
+ int avail, count = 1, separate = 0;
+ enum ldlm_lru_flags lru_flags = 0;
__u64 rc = 0;
struct ldlm_namespace *ns;
struct ldlm_lock *lock;
LDLM_LOCK_RELEASE(lock);
RETURN(0);
}
- bl_ast = 1;
+ if (ldlm_is_canceling(lock))
+ separate = 1;
} else if (ldlm_is_canceling(lock)) {
/* Lock is being canceled and the caller doesn't want to wait */
if (flags & LCF_ASYNC) {
if (rc == LDLM_FL_LOCAL_ONLY || flags & LCF_LOCAL) {
LDLM_LOCK_RELEASE(lock);
RETURN(0);
+ } else if (rc == LDLM_FL_BL_AST) {
+ /* BL_AST lock must not wait. */
+ lru_flags |= LDLM_LRU_FLAG_NO_WAIT;
}
exp = lock->l_conn_export;
- if (bl_ast) { /* Send RPC immedaitly for LDLM_FL_BL_AST */
- ldlm_cli_cancel_req(exp, lock, count, flags | LCF_ONE_LOCK);
+ /* If a lock has been taken from lru for a batched cancel and a later
+ * BL_AST came, send a CANCEL RPC individually for it right away, not
+ * waiting for the batch to be handled. */
+ if (separate) {
+ ldlm_cli_cancel_req(exp, lock, NULL, 1, flags);
LDLM_LOCK_RELEASE(lock);
RETURN(0);
}
ns = ldlm_lock_to_ns(lock);
count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
- LCF_BL_AST, 0);
+ LCF_BL_AST, lru_flags);
}
ldlm_cli_cancel_list(&cancels, count, NULL, flags);
}
if (bl_ast > 0) {
count -= bl_ast;
- ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
+ ldlm_cli_cancel_list(&head, bl_ast, NULL, cancel_flags);
}
RETURN(count);
ldlm_cancel_pack(req, cancels, count);
else
res = ldlm_cli_cancel_req(lock->l_conn_export,
- cancels, count,
+ NULL, cancels, count,
flags);
} else {
res = ldlm_cli_cancel_req(lock->l_conn_export,
- cancels, 1, flags);
+ NULL, cancels, 1, flags);
}
if (res < 0) {