#include <linux/lustre_dlm.h>
int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+ struct lustre_handle *connh,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
ENTRY;
*flags = 0;
- rc = ldlm_local_lock_create(ns, parent_lock_handle, res_id, type, mode,
- data, data_len, lockh);
- if (rc != ELDLM_OK)
- GOTO(out, rc);
-
- lock = lustre_handle2object(lockh);
+ lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
+ data, data_len);
+ if (lock == NULL)
+ GOTO(out, rc = -ENOMEM);
LDLM_DEBUG(lock, "client-side enqueue START");
+ /* for the local lock, add the reference */
+ ldlm_lock_addref_internal(lock, mode);
+ ldlm_lock2handle(lock, lockh);
if (req == NULL) {
- req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 1, &size, NULL);
+ req = ptlrpc_prep_req2(cl, conn, connh,
+ LDLM_ENQUEUE, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req_passed_in = 0;
/* Dump all of this data into the request buffer */
body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->lock_desc.l_resource.lr_type = type;
- memcpy(body->lock_desc.l_resource.lr_name, res_id,
- sizeof(body->lock_desc.l_resource.lr_name));
-
- body->lock_desc.l_req_mode = mode;
+ ldlm_lock2desc(lock, &body->lock_desc);
+ /* Phil: make this part of ldlm_lock2desc */
if (type == LDLM_EXTENT)
memcpy(&body->lock_desc.l_extent, cookie,
sizeof(body->lock_desc.l_extent));
body->lock_flags = *flags;
- memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1));
-
+ memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
if (parent_lock_handle)
memcpy(&body->lock_handle2, parent_lock_handle,
sizeof(body->lock_handle2));
req->rq_replen = lustre_msg_size(1, &size);
}
- lock->l_connection = conn;
+ lock->l_connection = ptlrpc_connection_addref(conn);
lock->l_client = cl;
rc = ptlrpc_queue_wait(req);
+ /* FIXME: status check here? */
rc = ptlrpc_check_status(req, rc);
- spin_lock(&lock->l_lock);
if (rc != ELDLM_OK) {
LDLM_DEBUG(lock, "client-side enqueue END (%s)",
rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
- spin_lock(&lock->l_resource->lr_lock);
- if (!ldlm_resource_put(lock->l_resource))
- spin_unlock(&lock->l_resource->lr_lock);
- ldlm_lock_free(lock);
+ ldlm_lock_put(lock);
+ ldlm_lock_decref(lockh, mode);
+ /* FIXME: if we've already received a completion AST, this will
+ * LBUG! */
+ ldlm_lock_destroy(lock);
GOTO(out, rc);
}
CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
"%ld\n", (long)reply->lock_resource_name[0],
(long)lock->l_resource->lr_name[0]);
- spin_lock(&lock->l_resource->lr_lock);
- if (!ldlm_resource_put(lock->l_resource))
- spin_unlock(&lock->l_resource->lr_lock);
- lock->l_resource =
- ldlm_resource_get(ns, NULL, reply->lock_resource_name,
- type, 1);
+ ldlm_lock_change_resource(lock, reply->lock_resource_name);
if (lock->l_resource == NULL) {
LBUG();
RETURN(-ENOMEM);
if (!req_passed_in)
ptlrpc_free_req(req);
- spin_unlock(&lock->l_lock);
- rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback,
+ rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
callback);
- LDLM_DEBUG(lock, "client-side enqueue END");
if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
LDLM_FL_BLOCK_CONV)) {
/* Go to sleep until the lock is granted. */
/* FIXME: or cancelled. */
- CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
- "going to sleep.\n", lock);
+ LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
+ " sleeping");
ldlm_lock_dump(lock);
wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
lock->l_granted_mode);
- CDEBUG(D_NET, "waking up, the lock must be granted.\n");
+ LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
}
+ LDLM_DEBUG(lock, "client-side enqueue END");
+ ldlm_lock_put(lock);
EXIT;
out:
return rc;
}
-int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, __u32 data_len, struct ptlrpc_request **reqp)
+int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
+ void *data, __u32 data_len)
{
+ struct ldlm_lock *lock;
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct ptlrpc_client *cl =
- &lock->l_resource->lr_namespace->ns_rpc_client;
+ struct ptlrpc_client *cl;
int rc = 0, size = sizeof(*body);
ENTRY;
+ lock = ldlm_handle2lock(lockh);
+ if (lock == NULL)
+ LBUG();
+ cl = &lock->l_resource->lr_namespace->ns_rpc_client;
req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
&size, NULL);
if (!req)
memcpy(&body->lock_handle1, &lock->l_remote_handle,
sizeof(body->lock_handle1));
- if (new == NULL) {
+ if (desc == NULL) {
CDEBUG(D_NET, "Sending granted AST\n");
ldlm_lock2desc(lock, &body->lock_desc);
} else {
CDEBUG(D_NET, "Sending blocked AST\n");
- ldlm_lock2desc(new, &body->lock_desc);
- ldlm_object2handle(new, &body->lock_handle2);
+ memcpy(&body->lock_desc, desc, sizeof(*desc));
}
LDLM_DEBUG(lock, "server preparing %s AST",
- new == NULL ? "completion" : "blocked");
+ desc == 0 ? "completion" : "blocked");
req->rq_replen = lustre_msg_size(0, NULL);
- if (reqp == NULL) {
- LBUG();
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- ptlrpc_free_req(req);
- } else
- *reqp = req;
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
EXIT;
out:
+ ldlm_lock_put(lock);
return rc;
}
int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
+ struct lustre_handle *connh,
int new_mode, int *flags)
{
struct ldlm_request *body;
int rc, size = sizeof(*body);
ENTRY;
- lock = lustre_handle2object(lockh);
+ lock = ldlm_handle2lock(lockh);
+ if (!lock)
+ LBUG();
*flags = 0;
LDLM_DEBUG(lock, "client-side convert");
- req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 1, &size,
- NULL);
+ req = ptlrpc_prep_req(cl, lock->l_connection,
+ LDLM_CONVERT, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
GOTO(out, rc);
reply = lustre_msg_buf(req->rq_repmsg, 0);
- res = ldlm_local_lock_convert(lockh, new_mode, &reply->lock_flags);
+ res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
if (res != NULL)
ldlm_reprocess_all(res);
if (lock->l_req_mode != lock->l_granted_mode) {
lock->l_granted_mode);
CDEBUG(D_NET, "waking up, the lock must be granted.\n");
}
+ ldlm_lock_put(lock);
EXIT;
out:
ptlrpc_free_req(req);
return rc;
}
-int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
+int ldlm_cli_cancel(struct lustre_handle *lockh,
+ struct lustre_handle *connh)
{
struct ptlrpc_request *req;
+ struct ldlm_lock *lock;
struct ldlm_request *body;
- struct ldlm_resource *res;
int rc, size = sizeof(*body);
ENTRY;
+ lock = ldlm_handle2lock(lockh);
+ if (!lock)
+ LBUG();
+
LDLM_DEBUG(lock, "client-side cancel");
- req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size,
- NULL);
+ req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
+ LDLM_CANCEL, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
if (rc != ELDLM_OK)
GOTO(out, rc);
- res = ldlm_local_lock_cancel(lock);
- if (res != NULL)
- ldlm_reprocess_all(res);
- else
- rc = ELDLM_RESOURCE_FREED;
+ ldlm_lock_cancel(lock);
+ ldlm_lock_put(lock);
EXIT;
out:
- return rc;
+ return 0;
}