#define DEBUG_SUBSYSTEM S_LDLM
#include <linux/lustre_dlm.h>
+#include <linux/obd.h>
int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
{
__u32 data_len,
struct lustre_handle *lockh)
{
- struct ptlrpc_connection *connection;
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
type, cookie, cookielen, mode,
flags, completion, blocking, data,
data_len, lockh);
- connection = client_conn2cli(connh)->cl_conn;
*flags = 0;
lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
ldlm_lock2handle(lock, lockh);
if (req == NULL) {
- req = ptlrpc_prep_req2(connh, LDLM_ENQUEUE, 1, &size, NULL);
+ req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
+ &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req_passed_in = 0;
req->rq_replen = lustre_msg_size(1, &size);
}
lock->l_connh = connh;
- lock->l_connection = ptlrpc_connection_addref(connection);
- lock->l_client = client_conn2cli(connh)->cl_client;
+ lock->l_export = NULL;
rc = ptlrpc_queue_wait(req);
/* FIXME: status check here? */
LDLM_DEBUG(lock, "client-side convert");
- req = ptlrpc_prep_req2(connh, LDLM_CONVERT, 1, &size, NULL);
+ req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
+ NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
if (lock->l_connh) {
LDLM_DEBUG(lock, "client-side cancel");
- req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size,
- NULL);
+ /* Set this flag to prevent others from getting new references*/
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock->l_flags |= LDLM_FL_CBPENDING;
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+ req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh),
+ LDLM_CANCEL, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
LDLM_LOCK_PUT(lock);
return rc;
}
+
+/* Cancel all locks on a given resource that have 0 readers/writers */
+int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id)
+{
+ struct ldlm_resource *res;
+ struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
+ struct ldlm_ast_work *w;
+ ENTRY;
+
+ res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+ if (res == NULL)
+ RETURN(-EINVAL);
+
+ l_lock(&ns->ns_lock);
+ list_for_each(tmp, &res->lr_granted) {
+ struct ldlm_lock *lock;
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (lock->l_readers || lock->l_writers)
+ continue;
+
+ /* Setting the CBPENDING flag is a little misleading, but
+ * prevents an important race; namely, once CBPENDING is set,
+ * the lock can accumulate no more readers/writers. Since
+ * readers and writers are already zero here, ldlm_lock_decref
+ * won't see this flag and call l_blocking_ast */
+ lock->l_flags |= LDLM_FL_CBPENDING;
+
+ OBD_ALLOC(w, sizeof(*w));
+ LASSERT(w);
+
+ w->w_lock = LDLM_LOCK_GET(lock);
+ list_add(&w->w_list, &list);
+ }
+ l_unlock(&ns->ns_lock);
+
+ list_for_each_safe(tmp, next, &list) {
+ struct lustre_handle lockh;
+ int rc;
+ w = list_entry(tmp, struct ldlm_ast_work, w_list);
+
+ ldlm_lock2handle(w->w_lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc != ELDLM_OK)
+ CERROR("ldlm_cli_cancel: %d\n", rc);
+
+ LDLM_LOCK_PUT(w->w_lock);
+ list_del(&w->w_list);
+ OBD_FREE(w, sizeof(*w));
+ }
+
+ RETURN(0);
+}