dlm->lock_count = canceloff;
}
/* Pack into the request @pack lock handles. */
- ldlm_cli_cancel_list(cancels, pack, req, 0);
+ ldlm_cli_cancel_list(cancels, pack, NULL, req, 0);
/* Prepare and send separate cancel RPC for others. */
- ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
+ ldlm_cli_cancel_list(cancels, count - pack, NULL, NULL, 0);
} else {
ldlm_lock_list_put(cancels, l_bl_ast, count);
}
return 1;
}
-#define ldlm_cancel_pack(req, head, count) \
- _ldlm_cancel_pack(req, NULL, head, count)
+
/**
- * Pack \a count locks in \a head into ldlm_request buffer of request \a req.
+ * Pack \a count locks in \a lock and \a head into ldlm_request buffer of
+ * the request \a req.
*/
-static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock,
- struct list_head *head, int count)
+static int ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock,
+ struct list_head *head, int count)
{
struct ldlm_request *dlm;
int max, packed = 0;
*/
if (lock) { /* only pack one lock */
packed = __ldlm_pack_lock(lock, dlm);
- } else {
+ count--;
+ }
+
+ if (count) { /* pack the list as well if given */
list_for_each_entry(lock, head, l_bl_ast) {
- if (!count--)
- break;
packed += __ldlm_pack_lock(lock, dlm);
+ if (!--count)
+ break;
}
}
+
CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
+ LASSERT(count == 0);
RETURN(packed);
}
/**
* Prepare and send a batched cancel RPC. It will include \a count lock
- * handles of locks given in \a cancels list.
+ * handles of locks given in \a lock and \a cancels list.
*/
int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
struct list_head *head, int count,
{
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
- int free, sent = 0;
+ int free;
int rc = 0;
ENTRY;
LASSERT(exp != NULL);
LASSERT(count > 0);
- LASSERT(head == NULL || lock == NULL);
+ LASSERT(head != NULL);
CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
ptlrpc_at_set_req_timeout(req);
- rc = _ldlm_cancel_pack(req, lock, head, count);
+ rc = ldlm_cancel_pack(req, lock, head, count);
if (rc == 0) {
ptlrpc_req_put(req);
- sent = count;
GOTO(out, rc);
}
ptlrpc_request_set_replen(req);
if (flags & LCF_ASYNC) {
ptlrpcd_add_req(req);
- sent = count;
- GOTO(out, 0);
+ GOTO(out, rc = 0);
}
rc = ptlrpc_queue_wait(req);
rc = 0;
} else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
req->rq_import_generation == imp->imp_generation) {
+ struct ldlm_lock *pos;
+
+ list_for_each_entry(pos, head, l_bl_ast) {
+ lock_res_and_lock(pos);
+ ldlm_clear_ast_sent(pos);
+ unlock_res_and_lock(pos);
+ }
+ if (lock) {
+ lock_res_and_lock(lock);
+ ldlm_clear_ast_sent(lock);
+ unlock_res_and_lock(lock);
+ }
+
ptlrpc_req_put(req);
continue;
} else if (rc != ELDLM_OK) {
rc);
break;
}
- sent = count;
break;
}
ptlrpc_req_put(req);
EXIT;
out:
- return sent ? sent : rc;
+ return rc ? : count;
}
static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
enum ldlm_cancel_flags flags)
{
struct obd_export *exp;
- int avail, count = 1, separate = 0;
+ int avail, count = 1;
enum ldlm_lru_flags lru_flags = 0;
__u64 rc = 0;
struct ldlm_namespace *ns;
ldlm_lock_put(lock);
RETURN(0);
}
- if (ldlm_is_canceling(lock))
- separate = 1;
} else if (ldlm_is_canceling(lock)) {
/* Lock is being canceled and the caller doesn't want to wait */
if (flags & LCF_ASYNC) {
}
exp = lock->l_conn_export;
- /* If a lock has been taken from lru for a batched cancel and a later
- * BL_AST came, send a CANCEL RPC individually for it right away, not
- * waiting for the batch to be handled. */
- if (separate) {
- ldlm_cli_cancel_req(exp, lock, NULL, 1, flags);
- ldlm_lock_put(lock);
- RETURN(0);
- }
-
- LASSERT(list_empty(&lock->l_bl_ast));
- list_add(&lock->l_bl_ast, &cancels);
/*
* This is a LDLM_CANCEL RPC which goes to canceld portal,
* so we can cancel other LRU locks here and send them all
count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
LCF_BL_AST, lru_flags);
}
- ldlm_cli_cancel_list(&cancels, count, NULL, flags);
+ ldlm_cli_cancel_list(&cancels, count, lock, NULL, flags);
RETURN(0);
}
}
if (bl_ast > 0) {
count -= bl_ast;
- ldlm_cli_cancel_list(&head, bl_ast, NULL, cancel_flags);
+ ldlm_cli_cancel_list(&head, bl_ast, NULL, NULL, cancel_flags);
}
RETURN(count);
EXPORT_SYMBOL(ldlm_cancel_resource_local);
/**
- * Cancel client-side locks from a list and send/prepare cancel RPCs to the
- * server.
- * If \a req is NULL, send CANCEL request to server with handles of locks
- * in the \a cancels. If EARLY_CANCEL is not supported, send CANCEL requests
- * separately per lock.
- * If \a req is not NULL, put handles of locks in \a cancels into the request
- * buffer at the offset \a off.
- * Destroy \a cancels at the end.
+ * Cancel client-side locks and send/prepare cancel RPCs to the server.
+ *
+ * Locks passed as a \a cancels list of locks or/and as a \a primary lock.
+ *
+ * \a count must not be larger than the total amount of passed locks in \a
+ * primary and \a cancels. Thus, it includes the \a primary (if passed), but
+ * may not include all the locks in \a cancels (if passed).
+ *
+ * If \a req is NULL, a new CANCEL RPC is created and sent with \a count lh
+ * packed into it, starting with the \a primary one.
+ * If EARLY_CANCEL is not supported, \a count of CANCEL RPCs is created and
+ * sent with one lh in each request.
+ *
+ * If \a req is not NULL, \a count of lh are just need to be placed into the
+ * RMF_DLM_REQ buffer of the given req.
+ *
+ * Destroy \a count of locks at the end, starting with the \a primary one.
*/
int ldlm_cli_cancel_list(struct list_head *cancels, int count,
+ struct ldlm_lock *primary,
struct ptlrpc_request *req,
enum ldlm_cancel_flags flags)
{
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock = primary;
+ struct obd_export *export;
int res = 0;
-
ENTRY;
- if (list_empty(cancels) || count == 0)
+ if (count == 0)
RETURN(0);
+ LASSERT(primary || !list_empty(cancels));
+ if (lock == NULL)
+ lock = list_first_entry(cancels, struct ldlm_lock, l_bl_ast);
+ export = lock->l_conn_export;
+ LASSERT(export != NULL);
+
/*
* XXX: requests (both batched and not) could be sent in parallel.
* Usually it is enough to have just 1 RPC, but it is possible that
* the feature.
*/
while (count > 0) {
- LASSERT(!list_empty(cancels));
- lock = list_first_entry(cancels, struct ldlm_lock, l_bl_ast);
- LASSERT(lock->l_conn_export);
-
- if (exp_connect_cancelset(lock->l_conn_export)) {
- res = count;
+ LASSERT(primary || !list_empty(cancels));
+ if (exp_connect_cancelset(export)) {
if (req)
- ldlm_cancel_pack(req, cancels, count);
+ res = ldlm_cancel_pack(req, primary,
+ cancels, count);
else
- res = ldlm_cli_cancel_req(lock->l_conn_export,
- NULL, cancels, count,
- flags);
+ res = ldlm_cli_cancel_req(export,
+ primary, cancels,
+ count, flags);
} else {
- res = ldlm_cli_cancel_req(lock->l_conn_export,
- NULL, cancels, 1, flags);
+ res = ldlm_cli_cancel_req(export, primary,
+ cancels, 1, flags);
}
if (res < 0) {
}
count -= res;
+ if (primary && res) {
+ ldlm_lock_put(primary);
+ primary = NULL;
+ res--;
+ }
ldlm_lock_list_put(cancels, l_bl_ast, res);
}
LASSERT(count == 0);
count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
0, flags | LCF_BL_AST, opaque);
- rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
+ rc = ldlm_cli_cancel_list(&cancels, count, NULL, NULL, flags);
if (rc != ELDLM_OK)
CERROR("canceling unused lock "DLDLMRES": rc = %d\n",
PLDLMRES(res), rc);