void ldlm_lock_destroy_nolock(struct ldlm_lock *lock);
/* ldlm_lockd.c */
-int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct ldlm_lock *lock, int flags);
+int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock);
+int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct list_head *cancels, int count);
+
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
ldlm_lock_remove_from_lru(lock);
unlock_res_and_lock(lock);
if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
- ldlm_bl_to_thread(ns, NULL, lock, 0) != 0)
+ ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
ldlm_handle_bl_callback(ns, NULL, lock);
} else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
!lock->l_readers && !lock->l_writers &&
struct ldlm_namespace *blwi_ns;
struct ldlm_lock_desc blwi_ld;
struct ldlm_lock *blwi_lock;
- int blwi_flags;
+ struct list_head blwi_head;
+ int blwi_count;
};
#ifdef __KERNEL__
cfs_time_add(lock->l_last_used,
cfs_time_seconds(10)))) {
unlock_res_and_lock(lock);
- if (ldlm_bl_to_thread(ns, NULL, lock, 0))
+ if (ldlm_bl_to_thread_lock(ns, NULL, lock))
ldlm_handle_bl_callback(ns, NULL, lock);
EXIT;
return ptlrpc_reply(req);
}
-int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct ldlm_lock *lock, int flags)
-{
#ifdef __KERNEL__
+static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+ struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
+ struct list_head *cancels, int count)
+{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
struct ldlm_bl_work_item *blwi;
ENTRY;
+ if (cancels && count == 0)
+ RETURN(0);
+
OBD_ALLOC(blwi, sizeof(*blwi));
if (blwi == NULL)
RETURN(-ENOMEM);
blwi->blwi_ns = ns;
if (ld != NULL)
blwi->blwi_ld = *ld;
- blwi->blwi_lock = lock;
- blwi->blwi_flags = flags;
-
+ if (count) {
+ list_add(&blwi->blwi_head, cancels);
+ list_del_init(cancels);
+ blwi->blwi_count = count;
+ } else {
+ blwi->blwi_lock = lock;
+ }
spin_lock(&blp->blp_lock);
list_add_tail(&blwi->blwi_entry, &blp->blp_list);
cfs_waitq_signal(&blp->blp_waitq);
spin_unlock(&blp->blp_lock);
RETURN(0);
+}
+#endif
+
+int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct ldlm_lock *lock)
+{
+#ifdef __KERNEL__
+ RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
+#else
+ RETURN(-ENOSYS);
+#endif
+}
+
+int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+ struct list_head *cancels, int count)
+{
+#ifdef __KERNEL__
+ RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
#else
RETURN(-ENOSYS);
#endif
CDEBUG(D_INODE, "blocking ast\n");
if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
ldlm_callback_reply(req, 0);
- if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0))
+ if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
case LDLM_CP_CALLBACK:
if (blwi->blwi_ns == NULL)
break;
- if (blwi->blwi_flags == LDLM_FL_CANCELING) {
+ if (blwi->blwi_count) {
/* The special case when we cancel locks in lru
- * asynchronously, then we first remove the lock from
- * l_bl_ast explicitely in ldlm_cancel_lru before
- * sending it to this thread. Thus lock is marked
- * LDLM_FL_CANCELING, and already cancelled locally. */
- CFS_LIST_HEAD(head);
- LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast));
- list_add(&blwi->blwi_lock->l_bl_ast, &head);
- ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export,
- &head, 1, 0);
- LDLM_LOCK_PUT(blwi->blwi_lock);
+ * asynchronously, we pass the list of locks here.
+ * Thus lock is marked LDLM_FL_CANCELING, and already
+ * canceled locally. */
+ ldlm_cli_cancel_list(&blwi->blwi_head,
+ blwi->blwi_count, NULL, 0, 0);
} else {
ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
blwi->blwi_lock);
/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
* a single page on the send/receive side. XXX: 512 should be changed
* to more adequate value. */
-#define ldlm_req_handles_avail(exp, size, bufcount, off) \
-({ \
- int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \
- int _s = size[DLM_LOCKREQ_OFF]; \
- size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \
- _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \
- bufcount, size); \
- _avail /= sizeof(struct lustre_handle); \
- _avail += LDLM_LOCKREQ_HANDLES - off; \
- size[DLM_LOCKREQ_OFF] = _s; \
- _avail; \
-})
+static inline int ldlm_req_handles_avail(struct obd_export *exp,
+ int *size, int bufcount, int off)
+{
+ int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
+ int old_size = size[DLM_LOCKREQ_OFF];
+
+ size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+ avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
+ bufcount, size);
+ avail /= sizeof(struct lustre_handle);
+ avail += LDLM_LOCKREQ_HANDLES - off;
+ size[DLM_LOCKREQ_OFF] = old_size;
+
+ return avail;
+}
+
+static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
+{
+ int size[2] = { sizeof(struct ptlrpc_body),
+ sizeof(struct ldlm_request) };
+ return ldlm_req_handles_avail(exp, size, 2, 0);
+}
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
* @count locks in @cancels. */
"out of sync -- not fatal\n",
libcfs_nid2str(req->rq_import->
imp_connection->c_peer.nid));
+ rc = 0;
} else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
req->rq_import_generation == imp->imp_generation) {
ptlrpc_req_finished(req);
int ldlm_cli_cancel(struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
- CFS_LIST_HEAD(head);
+ CFS_LIST_HEAD(cancels);
int rc = 0;
ENTRY;
}
rc = ldlm_cli_cancel_local(lock);
- if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY)
- GOTO(out, rc);
-
- list_add(&lock->l_bl_ast, &head);
- rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
- EXIT;
-out:
- LDLM_LOCK_PUT(lock);
- return rc < 0 ? rc : 0;
+ list_add(&lock->l_bl_ast, &cancels);
+
+ if (rc == LDLM_FL_BL_AST) {
+ rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0);
+ } else if (rc == LDLM_FL_CANCELING) {
+ int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+ int count = 1;
+ LASSERT(avail > 0);
+ count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace,
+ &cancels, 0, avail - 1,
+ LDLM_CANCEL_AGED);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+ }
+ if (rc != LDLM_FL_CANCELING)
+ LDLM_LOCK_PUT(lock);
+ RETURN(rc < 0 ? rc : 0);
}
/* - Free space in lru for @count new locks,
#endif
count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0);
if (sync == LDLM_ASYNC) {
- struct ldlm_lock *lock, *next;
- list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) {
- /* Remove from the list to allow blocking thread to
- * re-use l_bl_ast. */
- list_del_init(&lock->l_bl_ast);
- rc = ldlm_bl_to_thread(ns, NULL, lock,
- LDLM_FL_CANCELING);
- if (rc)
- list_add_tail(&lock->l_bl_ast, &next->l_bl_ast);
- }
+ rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
+ if (rc == 0)
+ RETURN(count);
}
- /* If some locks are left in the list in ASYNC mode, or
+ /* If an error occured in ASYNC mode, or
* this is SYNC mode, cancel the list. */
- ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF, 0);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
RETURN(0);
}
count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
0, flags, opaque);
- rc = ldlm_cli_cancel_list(&cancels, count, NULL,
- DLM_LOCKREQ_OFF, flags);
+ rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);