* bug 7311). */
(LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
lock_res_and_lock(lock);
- lock->l_flags |= LDLM_FL_CBPENDING;
+ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
unlock_res_and_lock(lock);
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
}
struct ptlrpc_request *req;
CFS_LIST_HEAD(head);
ENTRY;
-
+
if (cancels == NULL)
cancels = &head;
if (exp_connect_cancelset(exp)) {
int avail = ldlm_req_handles_avail(exp, size, bufcount,
LDLM_ENQUEUE_CANCEL_OFF);
LASSERT(avail >= count);
-
+
/* Cancel lru locks here _only_ if the server supports
* EARLY_CANCEL. Otherwise we have to send extra CANCEL
* rpc right on enqueue, what will make it slower, vs.
LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
RETURN(0);
}
-
+
rc = ldlm_cli_cancel_local(lock);
list_add(&lock->l_bl_ast, &cancels);
RETURN(rc < 0 ? rc : 0);
}
+/* XXX until we will have compound requests and can cut cancels from generic rpc
+ * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
+static int ldlm_cancel_list(struct list_head *cancels, int count)
+{
+ CFS_LIST_HEAD(head);
+ struct ldlm_lock *lock, *next;
+ int left = 0, bl_ast = 0, rc;
+
+ left = count;
+ list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+ if (left-- == 0)
+ break;
+
+ rc = ldlm_cli_cancel_local(lock);
+ if (rc == LDLM_FL_BL_AST) {
+ LDLM_DEBUG(lock, "Cancel lock separately");
+ list_del_init(&lock->l_bl_ast);
+ list_add(&lock->l_bl_ast, &head);
+ bl_ast ++;
+ continue;
+ }
+ if (rc == LDLM_FL_LOCAL_ONLY) {
+ /* CANCEL RPC should not be sent to server. */
+ list_del_init(&lock->l_bl_ast);
+ LDLM_LOCK_PUT(lock);
+ count--;
+ }
+
+ }
+ if(bl_ast > 0) {
+ count -= bl_ast;
+ ldlm_cli_cancel_list(&head, bl_ast, NULL, 0, 0);
+ }
+
+ RETURN(count);
+}
+
+
+/* cancel lock list without sending rpc to server*/
+static int ldlm_cancel_list_local(struct list_head *cancels, int count)
+{
+ struct ldlm_lock *lock, *next;
+ int left = 0;
+
+ left = count;
+ list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+ if (left-- == 0)
+ break;
+ ldlm_lock_cancel(lock);
+ /* CANCEL RPC should not be sent to server. */
+ list_del_init(&lock->l_bl_ast);
+ LDLM_LOCK_PUT(lock);
+ count--;
+ }
+ RETURN(count);
+}
+
/* - Free space in lru for @count new locks,
* redundant unused locks are canceled locally;
* - also cancel locally unused aged locks;
int count, int max, int flags)
{
cfs_time_t cur = cfs_time_current();
- int rc, added = 0, left, unused;
- struct ldlm_lock *lock, *next;
+ int added = 0, unused;
+ struct ldlm_lock *lock;
__u64 slv, lvf, lv;
ENTRY;
spin_lock(&ns->ns_unused_lock);
unused = ns->ns_nr_unused;
-
+
if (!ns_connect_lru_resize(ns))
count += unused - ns->ns_max_unused;
if (ns_connect_lru_resize(ns)) {
cfs_time_t la;
-
+
/* Take into account SLV only if cpount == 0. */
if (count == 0) {
/* Calculate lv for every lock. */
lock->l_last_used));
if (la == 0)
la = 1;
-
+
/* Stop when slv is not yet come from server
* or lv is smaller than it is. */
lv = lvf * la * unused;
}
spin_unlock(&ns->ns_unused_lock);
- /* Handle only @added inserted locks. */
- left = added;
- list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
- if (left-- == 0)
- break;
-
- rc = ldlm_cli_cancel_local(lock);
- if (rc == LDLM_FL_BL_AST) {
- CFS_LIST_HEAD(head);
-
- LDLM_DEBUG(lock, "Cancel lock separately");
- list_del_init(&lock->l_bl_ast);
- list_add(&lock->l_bl_ast, &head);
- ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
- rc = LDLM_FL_LOCAL_ONLY;
- }
- if (rc == LDLM_FL_LOCAL_ONLY) {
- /* CANCEL RPC should not be sent to server. */
- list_del_init(&lock->l_bl_ast);
- LDLM_LOCK_PUT(lock);
- added--;
- }
-
- }
- RETURN(added);
+ RETURN(ldlm_cancel_list(cancels, added));
}
/* when called with LDLM_ASYNC the blocking callback will be handled
ldlm_mode_t mode, int lock_flags,
int flags, void *opaque)
{
- struct ldlm_lock *lock, *next;
- int count = 0, left;
+ struct ldlm_lock *lock;
+ int count = 0;
ENTRY;
lock_res(res);
}
unlock_res(res);
- /* Handle only @count inserted locks. */
- left = count;
- list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
- int rc = LDLM_FL_LOCAL_ONLY;
-
- if (left-- == 0)
- break;
- if (flags & LDLM_FL_LOCAL_ONLY)
- ldlm_lock_cancel(lock);
- else
- rc = ldlm_cli_cancel_local(lock);
-
- if (rc == LDLM_FL_BL_AST) {
- CFS_LIST_HEAD(head);
+ if ((flags & LDLM_FL_LOCAL_ONLY))
+ RETURN(ldlm_cancel_list_local(cancels, count));
- LDLM_DEBUG(lock, "Cancel lock separately");
- list_del_init(&lock->l_bl_ast);
- list_add(&lock->l_bl_ast, &head);
- ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
- rc = LDLM_FL_LOCAL_ONLY;
- }
- if (rc == LDLM_FL_LOCAL_ONLY) {
- /* CANCEL RPC should not be sent to server. */
- list_del_init(&lock->l_bl_ast);
- LDLM_LOCK_PUT(lock);
- count--;
- }
- }
- RETURN(count);
+ RETURN(ldlm_cancel_list(cancels, count));
}
/* If @req is NULL, send CANCEL request to server with handles of locks