* by Cluster File Systems, Inc.
*/
-#define EXPORT_SYMTAB
-
#define DEBUG_SUBSYSTEM S_LDLM
#include <linux/lustre_dlm.h>
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer,
- __u32 ns_id,
- struct ldlm_handle *parent_lock_handle,
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+ struct lustre_handle *connh,
+ struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct lustre_handle *parent_lock_handle,
__u64 *res_id,
__u32 type,
- struct ldlm_extent *req_ex,
+ void *cookie, int cookielen,
ldlm_mode_t mode,
int *flags,
+ ldlm_lock_callback callback,
void *data,
__u32 data_len,
- struct ldlm_handle *lockh,
- struct ptlrpc_request **request)
+ struct lustre_handle *lockh)
{
+ struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
- struct ptlrpc_request *req;
- char *bufs[2] = {NULL, data};
- int rc, size[2] = {sizeof(*body), data_len};
-
-#if 0
- ldlm_local_lock_enqueue(obddev, ns_id, parent_lock_handle, res_id, type,
- req_ex, mode, flags);
-#endif
-
- /* FIXME: if this is a local lock, stop here. */
+ int rc, size = sizeof(*body), req_passed_in = 1;
+ ENTRY;
- req = ptlrpc_prep_req(cl, peer, LDLM_ENQUEUE, 2, size, bufs);
- if (!req)
+ *flags = 0;
+ lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
+ data, data_len);
+ if (lock == NULL)
GOTO(out, rc = -ENOMEM);
+ LDLM_DEBUG(lock, "client-side enqueue START");
+ /* for the local lock, add the reference */
+ ldlm_lock_addref_internal(lock, mode);
+ ldlm_lock2handle(lock, lockh);
+
+ if (req == NULL) {
+ req = ptlrpc_prep_req2(cl, conn, connh,
+ LDLM_ENQUEUE, 1, &size, NULL);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+ req_passed_in = 0;
+ } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
+ LBUG();
/* Dump all of this data into the request buffer */
body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->lock_desc.l_resource.lr_ns_id = ns_id;
- body->lock_desc.l_resource.lr_type = type;
- memcpy(body->lock_desc.l_resource.lr_name, res_id,
- sizeof(body->lock_desc.l_resource.lr_name));
-
- body->lock_desc.l_req_mode = mode;
- if (req_ex)
- memcpy(&body->lock_desc.l_extent, req_ex,
+ ldlm_lock2desc(lock, &body->lock_desc);
+ /* Phil: make this part of ldlm_lock2desc */
+ if (type == LDLM_EXTENT)
+ memcpy(&body->lock_desc.l_extent, cookie,
sizeof(body->lock_desc.l_extent));
- body->flags = *flags;
-
- /* FIXME: lock_handle1 will be the shadow handle */
+ body->lock_flags = *flags;
+ memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
if (parent_lock_handle)
memcpy(&body->lock_handle2, parent_lock_handle,
sizeof(body->lock_handle2));
/* Continue as normal. */
- size[0] = sizeof(*reply);
- req->rq_replen = lustre_msg_size(1, size);
+ if (!req_passed_in) {
+ size = sizeof(*reply);
+ req->rq_replen = lustre_msg_size(1, &size);
+ }
- rc = ptlrpc_queue_wait(cl, req);
+ lock->l_connection = ptlrpc_connection_addref(conn);
+ lock->l_client = cl;
+
+ rc = ptlrpc_queue_wait(req);
+ /* FIXME: status check here? */
rc = ptlrpc_check_status(req, rc);
- if (rc != ELDLM_OK)
+
+ if (rc != ELDLM_OK) {
+ LDLM_DEBUG(lock, "client-side enqueue END (%s)",
+ rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
+ LDLM_LOCK_PUT(lock);
+ ldlm_lock_decref(lockh, mode);
+ /* FIXME: if we've already received a completion AST, this will
+ * LBUG! */
+ ldlm_lock_destroy(lock);
GOTO(out, rc);
+ }
reply = lustre_msg_buf(req->rq_repmsg, 0);
- CERROR("remote handle: %p\n",
- (void *)(unsigned long)reply->lock_handle.addr);
- CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
- reply->lock_extent.end);
+ memcpy(&lock->l_remote_handle, &reply->lock_handle,
+ sizeof(lock->l_remote_handle));
+ if (type == LDLM_EXTENT)
+ memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
+ *flags = reply->lock_flags;
+
+ CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
+ (void *)(unsigned long)reply->lock_handle.addr, *flags);
+ CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
+ (unsigned long long)reply->lock_extent.start,
+ (unsigned long long)reply->lock_extent.end);
+
+ /* If enqueue returned a blocked lock but the completion handler has
+ * already run, then it fixed up the resource and we don't need to do it
+ * again. */
+ if ((*flags) & LDLM_FL_LOCK_CHANGED &&
+ lock->l_req_mode != lock->l_granted_mode) {
+ CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
+ "%ld\n", (long)reply->lock_resource_name[0],
+ (long)lock->l_resource->lr_name[0]);
+
+ ldlm_lock_change_resource(lock, reply->lock_resource_name);
+ if (lock->l_resource == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ LDLM_DEBUG(lock, "client-side enqueue, new resource");
+ }
+
+ if (!req_passed_in)
+ ptlrpc_free_req(req);
+ rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
+ callback);
+
+ if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_BLOCK_CONV)) {
+ /* Go to sleep until the lock is granted. */
+ /* FIXME: or cancelled. */
+ LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
+ " sleeping");
+ ldlm_lock_dump(lock);
+#warning ldlm needs to time out
+ wait_event(lock->l_waitq,
+ lock->l_req_mode == lock->l_granted_mode);
+ LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
+ }
+ LDLM_DEBUG(lock, "client-side enqueue END");
+ LDLM_LOCK_PUT(lock);
EXIT;
out:
- *request = req;
return rc;
}
-int ldlm_cli_namespace_new(struct ptlrpc_client *cl, struct lustre_peer *peer,
- __u32 ns_id, struct ptlrpc_request **request)
+int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
+ struct ptlrpc_connection *conn,
+ struct lustre_handle *connh,
+ struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct lustre_handle *parent_lock_handle,
+ __u64 *res_id,
+ __u32 type,
+ void *cookie, int cookielen,
+ ldlm_mode_t mode,
+ int *flags,
+ ldlm_lock_callback callback,
+ void *data,
+ __u32 data_len,
+ struct lustre_handle *lockh)
+{
+ int rc;
+ ENTRY;
+ rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
+ if (rc == 0) {
+ rc = ldlm_cli_enqueue(cl, conn, connh, req, ns,
+ parent_lock_handle, res_id, type, cookie,
+ cookielen, mode, flags, callback, data,
+ data_len, lockh);
+ if (rc != ELDLM_OK)
+ CERROR("ldlm_cli_enqueue: err: %d\n", rc);
+ RETURN(rc);
+ } else
+ RETURN(0);
+}
+
+int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
+ void *data, __u32 data_len)
{
+ struct ldlm_lock *lock;
struct ldlm_request *body;
struct ptlrpc_request *req;
- int rc, size = sizeof(*body);
+ struct ptlrpc_client *cl;
+ int rc = 0, size = sizeof(*body);
+ ENTRY;
- req = ptlrpc_prep_req(cl, peer, LDLM_NAMESPACE_NEW, 1, &size, NULL);
+ lock = ldlm_handle2lock(lockh);
+ if (lock == NULL) {
+ LBUG();
+ RETURN(-EINVAL);
+ }
+ cl = &lock->l_resource->lr_namespace->ns_rpc_client;
+ req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
+ &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->lock_desc.l_resource.lr_ns_id = ns_id;
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+
+ if (desc == NULL) {
+ CDEBUG(D_NET, "Sending granted AST\n");
+ ldlm_lock2desc(lock, &body->lock_desc);
+ } else {
+ CDEBUG(D_NET, "Sending blocked AST\n");
+ memcpy(&body->lock_desc, desc, sizeof(*desc));
+ }
+
+ LDLM_DEBUG(lock, "server preparing %s AST",
+ desc == 0 ? "completion" : "blocked");
req->rq_replen = lustre_msg_size(0, NULL);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
EXIT;
out:
- *request = req;
+ LDLM_LOCK_PUT(lock);
return rc;
}
-int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, __u32 data_len)
+int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
+ struct lustre_handle *connh,
+ int new_mode, int *flags)
{
struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ struct ldlm_lock *lock;
+ struct ldlm_resource *res;
struct ptlrpc_request *req;
- struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
- struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
- int rc, size[2] = {sizeof(*body), data_len};
- char *bufs[2] = {NULL, data};
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ lock = ldlm_handle2lock(lockh);
+ if (!lock) {
+ LBUG();
+ RETURN(-EINVAL);
+ }
+ *flags = 0;
+
+ LDLM_DEBUG(lock, "client-side convert");
- req = ptlrpc_prep_req(cl, &lock->l_peer, LDLM_CALLBACK, 2, size, bufs);
+ req = ptlrpc_prep_req(cl, lock->l_connection,
+ LDLM_CONVERT, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
memcpy(&body->lock_handle1, &lock->l_remote_handle,
sizeof(body->lock_handle1));
- if (new != NULL)
- ldlm_lock2desc(new, &body->lock_desc);
+ body->lock_desc.l_req_mode = new_mode;
+ body->lock_flags = *flags;
+
+ size = sizeof(*reply);
+ req->rq_replen = lustre_msg_size(1, &size);
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0);
+ res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
+ if (res != NULL)
+ ldlm_reprocess_all(res);
+ if (lock->l_req_mode != lock->l_granted_mode) {
+ /* Go to sleep until the lock is granted. */
+ /* FIXME: or cancelled. */
+ CDEBUG(D_NET, "convert returned a blocked lock, "
+ "going to sleep.\n");
+ wait_event(lock->l_waitq,
+ lock->l_req_mode == lock->l_granted_mode);
+ CDEBUG(D_NET, "waking up, the lock must be granted.\n");
+ }
+ LDLM_LOCK_PUT(lock);
+ EXIT;
+ out:
+ ptlrpc_free_req(req);
+ return rc;
+}
+
+int ldlm_cli_cancel(struct lustre_handle *lockh)
+{
+ struct ptlrpc_request *req;
+ struct ldlm_lock *lock;
+ struct ldlm_request *body;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ lock = ldlm_handle2lock(lockh);
+ if (!lock) {
+ /* It's possible that the decref that we did just before this
+ * cancel was the last reader/writer, and caused a cancel before
+ * we could call this function. If we want to make this
+ * impossible (by adding a dec_and_cancel() or similar), then
+ * we can put the LBUG back. */
+ //LBUG();
+ RETURN(-EINVAL);
+ }
+
+ LDLM_DEBUG(lock, "client-side cancel");
+ req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
+ LDLM_CANCEL, 1, &size, NULL);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
req->rq_replen = lustre_msg_size(0, NULL);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
ptlrpc_free_req(req);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+ ldlm_lock_cancel(lock);
+ LDLM_LOCK_PUT(lock);
EXIT;
out:
- return rc;
+ return 0;
}