* bug 7311). */
(LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
lock_res_and_lock(lock);
- lock->l_flags |= LDLM_FL_CBPENDING;
+ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
unlock_res_and_lock(lock);
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
}
{
int rc = LDLM_FL_LOCAL_ONLY;
ENTRY;
-
+
if (lock->l_conn_export) {
int local_only;
LASSERT(dlm != NULL);
/* Check the room in the request buffer. */
- max = lustre_msg_buflen(req->rq_reqmsg, off) -
+ max = lustre_msg_buflen(req->rq_reqmsg, off) -
sizeof(struct ldlm_request);
max /= sizeof(struct lustre_handle);
max += LDLM_LOCKREQ_HANDLES;
free = ldlm_req_handles_avail(exp, size, 2, 0);
if (count > free)
count = free;
-
+
size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);
while (1) {
imp = class_exp2cliimp(exp);
sent = count;
break;
}
-
+
ptlrpc_req_finished(req);
EXIT;
out:
}
-/* - Free space in lru for @count new locks,
+/* XXX until we will have compound requests and can cut cancels from generic rpc
+ * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
+static int ldlm_cancel_list(struct list_head *cancels, int count)
+{
+ CFS_LIST_HEAD(head);
+ struct ldlm_lock *lock, *next;
+ int left = 0, bl_ast = 0, rc;
+
+ left = count;
+ list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+ if (left-- == 0)
+ break;
+
+ rc = ldlm_cli_cancel_local(lock);
+ if (rc == LDLM_FL_BL_AST) {
+ LDLM_DEBUG(lock, "Cancel lock separately");
+ list_del_init(&lock->l_bl_ast);
+ list_add(&lock->l_bl_ast, &head);
+ bl_ast ++;
+ continue;
+ }
+ if (rc == LDLM_FL_LOCAL_ONLY) {
+ /* CANCEL RPC should not be sent to server. */
+ list_del_init(&lock->l_bl_ast);
+ LDLM_LOCK_PUT(lock);
+ count--;
+ }
+
+ }
+ if(bl_ast > 0) {
+ count -= bl_ast;
+ ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
+ }
+
+ RETURN(count);
+}
+
+/* cancel lock list without sending rpc to server*/
+static int ldlm_cancel_list_local(struct list_head *cancels, int count)
+{
+ struct ldlm_lock *lock, *next;
+ int left = 0;
+
+ left = count;
+ list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+ if (left-- == 0)
+ break;
+ ldlm_lock_cancel(lock);
+ /* CANCEL RPC should not be sent to server. */
+ list_del_init(&lock->l_bl_ast);
+ LDLM_LOCK_PUT(lock);
+ count--;
+ }
+ RETURN(count);
+}
+
+/* - Free space in lru for @count new locks,
* redundant unused locks are canceled locally;
* - also cancel locally unused aged locks;
* - do not cancel more than @max locks;
int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
int count, int max, int flags)
{
- int rc, added = 0, left, unused;
+ int added = 0, unused;
cfs_time_t cur = cfs_time_current();
- struct ldlm_lock *lock, *next;
+ struct ldlm_lock *lock;
ENTRY;
spin_lock(&ns->ns_unused_lock);
unused = ns->ns_nr_unused;
-
+
if (!ns_connect_lru_resize(ns))
count += unused - ns->ns_max_unused;
-
+
while (!list_empty(&ns->ns_unused_list)) {
struct ldlm_pool *pl;
__u64 slv, lvf, lv;
break;
pl = &ns->ns_pool;
-
+
if (ns_connect_lru_resize(ns)) {
cfs_time_t la;
lock->l_last_used));
if (la == 0)
la = 1;
-
+
/* Stop when slv is not yet come from server or
* lv is smaller than it is. */
lv = lvf * la * unused;
lock->l_last_used)))
break;
}
-
+
LDLM_LOCK_GET(lock); /* dropped by bl thread */
spin_unlock(&ns->ns_unused_lock);
}
spin_unlock(&ns->ns_unused_lock);
- /* Handle only @added inserted locks. */
- left = added;
- list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
- if (left-- == 0)
- break;
-
- rc = ldlm_cli_cancel_local(lock);
- if (rc == LDLM_FL_BL_AST) {
- CFS_LIST_HEAD(head);
-
- LDLM_DEBUG(lock, "Cancel lock separately");
- list_del_init(&lock->l_bl_ast);
- list_add(&lock->l_bl_ast, &head);
- ldlm_cli_cancel_req(lock->l_conn_export, &head, 1);
- rc = LDLM_FL_LOCAL_ONLY;
- }
- if (rc == LDLM_FL_LOCAL_ONLY) {
- /* CANCEL RPC should not be sent to server. */
- list_del_init(&lock->l_bl_ast);
- LDLM_LOCK_PUT(lock);
- added--;
- }
- }
- RETURN(added);
+ RETURN (ldlm_cancel_list(cancels, added));
}
/* when called with LDLM_ASYNC the blocking callback will be handled
ldlm_mode_t mode, int lock_flags,
int flags, void *opaque)
{
- struct ldlm_lock *lock, *next;
- int count = 0, left;
+ struct ldlm_lock *lock;
+ int count = 0;
ENTRY;
lock_res(res);
unlock_res(res);
/* Handle only @count inserted locks. */
- left = count;
- list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
- int rc = LDLM_FL_LOCAL_ONLY;
+ if ((flags & LDLM_FL_LOCAL_ONLY))
+ RETURN(ldlm_cancel_list_local(cancels, count));
- if (left-- == 0)
- break;
- if (flags & LDLM_FL_LOCAL_ONLY)
- ldlm_lock_cancel(lock);
- else
- rc = ldlm_cli_cancel_local(lock);
-
- if (rc == LDLM_FL_BL_AST) {
- CFS_LIST_HEAD(head);
-
- LDLM_DEBUG(lock, "Cancel lock separately");
- list_del_init(&lock->l_bl_ast);
- list_add(&lock->l_bl_ast, &head);
- ldlm_cli_cancel_req(lock->l_conn_export, &head, 1);
- rc = LDLM_FL_LOCAL_ONLY;
- }
- if (rc == LDLM_FL_LOCAL_ONLY) {
- /* CANCEL RPC should not be sent to server. */
- list_del_init(&lock->l_bl_ast);
- LDLM_LOCK_PUT(lock);
- count--;
- }
- }
- RETURN(count);
+ RETURN(ldlm_cancel_list(cancels, count));
}
-/* If @req is NULL, send CANCEL request to server with handles of locks
- * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests
+/* If @req is NULL, send CANCEL request to server with handles of locks
+ * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests
* separately per lock.
- * If @req is not NULL, put handles of locks in @cancels into the request
+ * If @req is not NULL, put handles of locks in @cancels into the request
* buffer at the offset @off.
* Destroy @cancels at the end. */
int ldlm_cli_cancel_list(struct list_head *cancels, int count,
if (list_empty(cancels) || count == 0)
RETURN(0);
-
+
while (count) {
LASSERT(!list_empty(cancels));
lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast);