#include <linux/lustre_dlm.h>
#include <linux/lustre_mds.h>
-static kmem_cache_t *ldlm_lock_slab;
+kmem_cache_t *ldlm_lock_slab;
int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
l_lock(&ns->ns_lock);
type = lock->l_resource->lr_type;
- for (i = 0; i < lock->l_refc; i++) {
- int rc;
- rc = ldlm_resource_put(lock->l_resource);
- if (rc == 1 && i != lock->l_refc - 1)
- LBUG();
- }
-
lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
if (lock->l_resource == NULL) {
LBUG();
RETURN(-ENOMEM);
}
- for (i = 1; i < lock->l_refc; i++)
+ /* move references over */
+ for (i = 0; i < lock->l_refc; i++) {
+ int rc;
ldlm_resource_addref(lock->l_resource);
+ rc = ldlm_resource_put(lock->l_resource);
+ if (rc == 1 && i != lock->l_refc - 1)
+ LBUG();
+ }
l_unlock(&ns->ns_lock);
RETURN(0);
return lockmode_compat(a->l_req_mode, b->l_req_mode);
}
-/* Args: unreferenced, locked lock
- *
- * Caller must do its own ldlm_resource_put() on lock->l_resource */
-void ldlm_lock_free(struct ldlm_lock *lock)
-{
- if (!list_empty(&lock->l_children)) {
- CERROR("lock %p still has children!\n", lock);
- ldlm_lock_dump(lock);
- LBUG();
- }
-
- if (lock->l_readers || lock->l_writers)
- CDEBUG(D_INFO, "lock still has references (%d readers, %d "
- "writers)\n", lock->l_readers, lock->l_writers);
-
- if (lock->l_connection)
- ptlrpc_put_connection(lock->l_connection);
- kmem_cache_free(ldlm_lock_slab, lock);
-}
-
void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
{
ldlm_res2desc(lock->l_resource, &desc->l_resource);
}
lock->l_flags |= LDLM_FL_AST_SENT;
-
+ /* FIXME: this should merely add the lock to the lr_tmp list */
lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
/* If we received a blocked AST and this was the last reference,
* run the callback. */
if (!lock->l_readers && !lock->l_writers &&
- (lock->l_flags & LDLM_FL_BLOCKED_PENDING)) {
+ (lock->l_flags & LDLM_FL_CBPENDING)) {
if (!lock->l_resource->lr_namespace->ns_client) {
- CERROR("LDLM_FL_DYING set on non-local lock!\n");
+ CERROR("LDLM_FL_CBPENDING set on non-local lock!\n");
LBUG();
}
- CDEBUG(D_INFO, "final decref done on dying lock, "
+ CDEBUG(D_INFO, "final decref done on cbpending lock, "
"calling callback.\n");
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_blocking_ast(lock, NULL, lock->l_data,
lock->l_data_len, NULL);
} else
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_lock_put(lock);
EXIT;
}
-static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
+static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
struct list_head *queue)
{
struct list_head *tmp, *pos;
- int rc = 0;
+ int rc = 1;
list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *child;
continue;
compat = ldlm_res_compat_table[child->l_resource->lr_type];
- if (compat(child, lock)) {
+ if (compat && compat(child, lock)) {
CDEBUG(D_OTHER, "compat function succeded, next.\n");
continue;
}
continue;
}
- rc = 1;
+ rc = 0;
CDEBUG(D_OTHER, "compat function failed and lock modes "
"incompat\n");
ENTRY;
l_lock(&lock->l_resource->lr_namespace->ns_lock);
- rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
+ rc = ldlm_lock_compat_list(lock, send_cbs, &lock->l_resource->lr_granted);
/* FIXME: should we be sending ASTs to converting? */
- rc |= _ldlm_lock_compat(lock, send_cbs,
- &lock->l_resource->lr_converting);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ if (rc)
+ rc = ldlm_lock_compat_list
+ (lock, send_cbs, &lock->l_resource->lr_converting);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
RETURN(rc);
}
-void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
+/* NOTE: called by
+ - ldlm_handle_enqueuque - resource
+*/
+void ldlm_grant_lock(struct ldlm_lock *lock)
{
+ struct ldlm_resource *res = lock->l_resource;
struct ptlrpc_request *req = NULL;
ENTRY;
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_resource_add_lock(res, &res->lr_granted, lock);
lock->l_granted_mode = lock->l_req_mode;
res->lr_most_restr = lock->l_granted_mode;
if (lock->l_completion_ast) {
+ /* FIXME: this should merely add lock to lr_tmp list */
lock->l_completion_ast(lock, NULL, lock->l_data,
lock->l_data_len, &req);
if (req != NULL) {
- struct list_head *list = res->lr_tmp;
+ struct list_head *list = &res->lr_tmp;
if (list == NULL) {
LBUG();
return;
list_add(&req->rq_multi, list);
}
}
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
EXIT;
}
}
/* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
-ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
- void *cookie, int cookie_len,
- int *flags,
- ldlm_lock_callback completion,
- ldlm_lock_callback blocking)
+ldlm_error_t ldlm_lock_enqueue(struct lustre_lock *lock,
+ void *cookie, int cookie_len,
+ int *flags,
+ ldlm_lock_callback completion,
+ ldlm_lock_callback blocking)
{
struct ldlm_resource *res;
struct ldlm_lock *lock;
*flags |= LDLM_FL_BLOCK_WAIT;
GOTO(out, ELDLM_OK);
}
- incompat = ldlm_lock_compat(lock, 0);
- if (incompat) {
+ if (!ldlm_lock_compat(lock,0)) {
ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
*flags |= LDLM_FL_BLOCK_GRANTED;
GOTO(out, ELDLM_OK);
return ELDLM_OK;
}
-/* Must be called with resource->lr_lock taken. */
+/* Must be called with namespace taken: queue is waiting or converting. */
static int ldlm_reprocess_queue(struct ldlm_resource *res,
- struct list_head *converting)
+ struct list_head *queue)
{
struct list_head *tmp, *pos;
ENTRY;
- list_for_each_safe(tmp, pos, converting) {
+ list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *pending;
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
- if (ldlm_lock_compat(pending, 1))
+ if (!ldlm_lock_compat(pending, 1))
RETURN(1);
list_del_init(&pending->l_res_link);
}
/* Must be called with lock and lock->l_resource unlocked */
-struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
+void ldlm_lock_cancel(struct ldlm_lock *lock)
{
struct ldlm_resource *res;
struct ldlm_namespace *ns;
CDEBUG(D_INFO, "lock still has references (%d readers, %d "
"writers)\n", lock->l_readers, lock->l_writers);
- ldlm_resource_del_lock(lock);
+ ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy(lock);
-
l_unlock(&ns->ns_lock);
-
- RETURN(res);
}
/* Must be called with lock and lock->l_resource unlocked */
-struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
- int new_mode, int *flags)
+struct ldlm_resource *ldlm_convert(struct lustre_handle *lockh, int new_mode, int *flags)
{
struct ldlm_lock *lock;
struct ldlm_resource *res;
l_lock(&ns->ns_lock);
lock->l_req_mode = new_mode;
- ldlm_resource_del_lock(lock);
+ ldlm_resource_unlink_lock(lock);
/* If this is a local resource, put it on the appropriate list. */
if (res->lr_namespace->ns_client) {
- if (*flags & LDLM_FL_BLOCK_CONV)
+ if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
ldlm_resource_add_lock(res, res->lr_converting.prev,
lock);
- else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
- ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
- else
- ldlm_grant_lock(res, lock);
+ else {
+ ldlm_grant_lock(lock);
+ /* FIXME: completion handling not with ns_lock held ! */
+ wake_up(&lock->l_waitq);
+ }
} else {
list_add(&lock->l_res_link, res->lr_converting.prev);
}
if (!lock)
GOTO(out, -ENOMEM);
- ldlm_lock2handle(&lockh);
memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
sizeof(lock->l_remote_handle));
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
flags = dlm_req->lock_flags;
- err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
+ err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
ldlm_cli_callback, ldlm_cli_callback);
if (err != ELDLM_OK)
GOTO(out, err);
dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
dlm_rep->lock_flags = flags;
- memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
+ ldlm_lock2handle(lock, &dlm_rep->lock_handle);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
memcpy(&dlm_rep->lock_extent, &lock->l_extent,
sizeof(lock->l_extent));
dlm_rep->lock_flags = dlm_req->lock_flags;
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
- if (!lock)
- RETURN(0);
-
- LDLM_DEBUG(lock, "server-side convert handler START");
- ldlm_local_lock_convert(&dlm_req->lock_handle1,
- dlm_req->lock_desc.l_req_mode,
- &dlm_rep->lock_flags);
- req->rq_status = 0;
+ if (!lock) {
+ req->rq_status = EINVAL;
+ } else {
+ LDLM_DEBUG(lock, "server-side convert handler START");
+ ldlm_lock_convert(&dlm_req->lock_handle1,
+ dlm_req->lock_desc.l_req_mode,
+ &dlm_rep->lock_flags);
+ req->rq_status = 0;
+ }
if (ptlrpc_reply(svc, req) != 0)
LBUG();
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
- if (!lock)
- RETURN(0);
-
- LDLM_DEBUG(lock, "server-side cancel handler START");
- ldlm_local_lock_cancel(lock);
- req->rq_status = 0;
+ if (!lock) {
+ req->rq_status = ESTALE;
+ } else {
+ LDLM_DEBUG(lock, "server-side cancel handler START");
+ ldlm_local_lock_cancel(lock);
+ req->rq_status = 0;
+ }
if (ptlrpc_reply(svc, req) != 0)
LBUG();
struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
- struct ldlm_lock *lock, *new;
+ struct ldlm_lock_desc desc;
+ struct ldlm_lock_desc *descp;
+ struct ldlm_lock *lock;
+ __u64 is_blocking_ast;
int rc;
ENTRY;
RETURN(rc);
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
- new = ldlm_handle2lock(&dlm_req->lock_handle2);
+ is_blocking_ast = dlm_req->lock_handle2.addr;
+ if (is_blocking_ast) {
+ /* FIXME: copy lock information into desc here */
+ descp = &desc;
+ } else
+ descp = NULL;
+
if (!lock) {
CERROR("callback on lock %Lx - lock disappeared\n",
dlm_req->lock_handle.addr);
LDLM_DEBUG(lock, "client %s callback handler START",
new == NULL ? "completion" : "blocked");
- spin_lock(&lock->l_resource->lr_lock);
- spin_lock(&lock->l_lock);
- if (!new) {
- lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
-
- /* If we receive the completion AST before the actual enqueue
- * returned, then we might need to switch resources. */
- if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
- lock->l_resource->lr_name,
- sizeof(__u64) * RES_NAME_SIZE) != 0) {
-
- ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
- LDLM_DEBUG(lock, "completion AST, new resource");
- }
- ldlm_grant_lock(lock);
- /* XXX Fixme: we want any completion function, not just wake_up */
- wake_up(&lock->l_waitq);
- ldlm_lock_put(lock);
- } else {
+ if (is_blocking_ast) {
int do_ast;
l_lock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_flags |= LDLM_FL_CBPENDING;
CDEBUG(D_INFO, "Lock already unused, calling "
"callback (%p).\n", lock->l_blocking_ast);
if (lock->l_blocking_ast != NULL)
- lock->l_blocking_ast(lock, new, lock->l_data,
+ lock->l_blocking_ast(lock, descp, lock->l_data,
lock->l_data_len, NULL);
} else {
LDLM_DEBUG(lock, "Lock still has references, will be"
" cancelled later");
}
ldlm_lock_put(lock);
+ } else {
+ lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+
+ /* If we receive the completion AST before the actual enqueue
+ * returned, then we might need to switch resources. */
+ if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
+ lock->l_resource->lr_name,
+ sizeof(__u64) * RES_NAME_SIZE) != 0) {
+ ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
+ LDLM_DEBUG(lock, "completion AST, new resource");
+ }
+ ldlm_grant_lock(lock);
+ /* FIXME: we want any completion function, not just wake_up */
+ wake_up(&lock->l_waitq);
+ ldlm_lock_put(lock);
}
LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
new == NULL ? "completion" : "blocked", lock);
-
RETURN(0);
}
if (lock == NULL)
GOTO(out, rc = -ENOMEM);
- ldlm_lock2handle(lock, &lockh);
LDLM_DEBUG(lock, "client-side enqueue START");
if (req == NULL) {
sizeof(body->lock_desc.l_extent));
body->lock_flags = *flags;
- memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1));
-
+ ldlm_lock2handle(lock, &body->lock_handle1);
if (parent_lock_handle)
memcpy(&body->lock_handle2, parent_lock_handle,
sizeof(body->lock_handle2));
if (!req_passed_in)
ptlrpc_free_req(req);
- rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback,
+ rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
callback);
LDLM_DEBUG(lock, "client-side enqueue END");
int rc, size = sizeof(*body);
ENTRY;
- lock = lustre_handle2object(lockh);
+ lock = ldlm_handle2lock(lockh);
+ if (!lock)
+ LBUG();
*flags = 0;
LDLM_DEBUG(lock, "client-side convert");
lock->l_granted_mode);
CDEBUG(D_NET, "waking up, the lock must be granted.\n");
}
+ ldlm_put_lock(lock);
EXIT;
out:
ptlrpc_free_req(req);
return rc;
}
-int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
+int ldlm_cli_cancel(struct ptlrpc_client *cl, struct lustre_handle *lockh)
{
struct ptlrpc_request *req;
+ struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_resource *res;
int rc, size = sizeof(*body);
ENTRY;
+ lock = ldlm_handle2lock(lockh);
+ if (!lock)
+ LBUG();
+
LDLM_DEBUG(lock, "client-side cancel");
req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size,
NULL);
if (rc != ELDLM_OK)
GOTO(out, rc);
- res = ldlm_local_lock_cancel(lock);
- if (res != NULL)
- ldlm_reprocess_all(res);
- else
- rc = ELDLM_RESOURCE_FREED;
+ ldlm_lock_cancel(lock);
+ ldlm_reprocess_all(res);
+ ldlm_lock_put(lock);
EXIT;
out:
- return rc;
+ return 0;
}