struct ldlm_namespace *blwi_ns;
struct ldlm_lock_desc blwi_ld;
struct ldlm_lock *blwi_lock;
- struct list_head blwi_head;
- int blwi_count;
+ int blwi_flags;
};
#ifdef __KERNEL__
cfs_time_add(lock->l_last_used,
cfs_time_seconds(10)))) {
unlock_res_and_lock(lock);
- if (ldlm_bl_to_thread_lock(ns, NULL, lock))
+ if (ldlm_bl_to_thread(ns, NULL, lock, 0))
ldlm_handle_bl_callback(ns, NULL, lock);
EXIT;
}
return ptlrpc_reply(req);
}
-
-#ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
- struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
- struct list_head *cancels, int count)
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock, int flags)
{
+#ifdef __KERNEL__
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
struct ldlm_bl_work_item *blwi;
ENTRY;
- if (cancels && count == 0)
- RETURN(0);
-
OBD_ALLOC(blwi, sizeof(*blwi));
if (blwi == NULL)
RETURN(-ENOMEM);
blwi->blwi_ns = ns;
if (ld != NULL)
blwi->blwi_ld = *ld;
- if (count) {
- list_add(&blwi->blwi_head, cancels);
- list_del_init(cancels);
- blwi->blwi_count = count;
- } else {
- blwi->blwi_lock = lock;
- }
+ blwi->blwi_lock = lock;
+ blwi->blwi_flags = flags;
+
spin_lock(&blp->blp_lock);
list_add_tail(&blwi->blwi_entry, &blp->blp_list);
cfs_waitq_signal(&blp->blp_waitq);
spin_unlock(&blp->blp_lock);
RETURN(0);
-}
-#endif
-
-int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct ldlm_lock *lock)
-{
-#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
-#else
- RETURN(-ENOSYS);
-#endif
-}
-
-int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct list_head *cancels, int count)
-{
-#ifdef __KERNEL__
- RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
#else
RETURN(-ENOSYS);
#endif
CDEBUG(D_INODE, "blocking ast\n");
if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
ldlm_callback_reply(req, 0);
- if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
+ if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
case LDLM_CP_CALLBACK:
if (blwi->blwi_ns == NULL)
break;
- if (blwi->blwi_count) {
+ if (blwi->blwi_flags == LDLM_FL_CANCELING) {
/* The special case when we cancel locks in lru
- * asynchronously, we pass the list of locks here.
- * Thus lock is marked LDLM_FL_CANCELING, and already
- * canceled locally. */
- ldlm_cli_cancel_list(&blwi->blwi_head,
- blwi->blwi_count, NULL, 0);
+ * asynchronously, then we first remove the lock from
+ * l_bl_ast explicitely in ldlm_cancel_lru before
+ * sending it to this thread. Thus lock is marked
+ * LDLM_FL_CANCELING, and already cancelled locally. */
+ CFS_LIST_HEAD(head);
+ LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast));
+ list_add(&blwi->blwi_lock->l_bl_ast, &head);
+ ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export,
+ &head, 1);
+ LDLM_LOCK_PUT(blwi->blwi_lock);
} else {
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
* a single page on the send/receive side. XXX: 512 should be changed
* to more adequate value. */
-static inline int ldlm_req_handles_avail(struct obd_export *exp,
- int *size, int bufcount, int off)
-{
- int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
- int old_size = size[DLM_LOCKREQ_OFF];
-
- size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
- avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
- bufcount, size);
- avail /= sizeof(struct lustre_handle);
- avail += LDLM_LOCKREQ_HANDLES - off;
- size[DLM_LOCKREQ_OFF] = old_size;
-
- return avail;
-}
-
-static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
-{
- int size[2] = { sizeof(struct ptlrpc_body),
- sizeof(struct ldlm_request) };
- return ldlm_req_handles_avail(exp, size, 2, 0);
-}
+#define ldlm_req_handles_avail(exp, size, bufcount, off) \
+({ \
+ int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \
+ int _s = size[DLM_LOCKREQ_OFF]; \
+ size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \
+ _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \
+ bufcount, size); \
+ _avail /= sizeof(struct lustre_handle); \
+ _avail += LDLM_LOCKREQ_HANDLES - off; \
+ size[DLM_LOCKREQ_OFF] = _s; \
+ _avail; \
+})
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
* @count locks in @cancels. */
/* Estimate the amount of free space in the request. */
int avail = ldlm_req_handles_avail(exp, size, bufcount,
LDLM_ENQUEUE_CANCEL_OFF);
- int to_free = exp_connect_lru_resize(exp) ? 0 : 1;
LASSERT(avail >= count);
+
/* Cancel lru locks here _only_ if the server supports
* EARLY_CANCEL. Otherwise we have to send extra CANCEL
* rpc right on enqueue, what will make it slower, vs.
* asynchronous rpc in blocking thread. */
- count += ldlm_cancel_lru_local(ns, cancels, to_free,
+ count += ldlm_cancel_lru_local(ns, cancels,
+ exp_connect_lru_resize(exp) ? 0 : 1,
avail - count, LDLM_CANCEL_AGED);
size[DLM_LOCKREQ_OFF] =
ldlm_request_bufsize(count, LDLM_ENQUEUE);
"out of sync -- not fatal\n",
libcfs_nid2str(req->rq_import->
imp_connection->c_peer.nid));
- rc = 0;
} else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
req->rq_import_generation == imp->imp_generation) {
ptlrpc_req_finished(req);
}
rc = ldlm_cli_cancel_local(lock);
+
+ if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY)
+ GOTO(out, rc);
+
list_add(&lock->l_bl_ast, &cancels);
+ rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1);
+ EXIT;
+ out:
+ LDLM_LOCK_PUT(lock);
+ return rc < 0 ? rc : 0;
- if (rc == LDLM_FL_BL_AST) {
- rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1);
- } else if (rc == LDLM_FL_CANCELING) {
- int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
- int count = 1;
- LASSERT(avail > 0);
- count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace,
- &cancels, 0, avail - 1,
- LDLM_CANCEL_AGED);
- ldlm_cli_cancel_list(&cancels, count, NULL, 0);
- }
- if (rc != LDLM_FL_CANCELING)
- LDLM_LOCK_PUT(lock);
- RETURN(rc < 0 ? rc : 0);
}
-/* - Free space in lru for @count new locks,
+/* - Free space in lru for @count new locks,
* redundant unused locks are canceled locally;
* - also cancel locally unused aged locks;
* - do not cancel more than @max locks;
#endif
count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0);
if (sync == LDLM_ASYNC) {
- rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
- if (rc == 0)
- RETURN(count);
+ struct ldlm_lock *lock, *next;
+ list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) {
+ /* Remove from the list to allow blocking thread to
+ * re-use l_bl_ast. */
+ list_del_init(&lock->l_bl_ast);
+ rc = ldlm_bl_to_thread(ns, NULL, lock,
+ LDLM_FL_CANCELING);
+ if (rc)
+ list_add_tail(&lock->l_bl_ast, &next->l_bl_ast);
+ }
}
- /* If an error occured in ASYNC mode, or
+
+ /* If some locks are left in the list in ASYNC mode, or
* this is SYNC mode, cancel the list. */
- ldlm_cli_cancel_list(&cancels, count, NULL, 0);
+ ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF);
RETURN(count);
}
count = ldlm_cancel_resource_local(res, &cancels, NULL, LCK_MINMODE,
0, flags, opaque);
- rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0);
+ rc = ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);