#include <linux/obd_class.h>
#include <linux/obd.h>
-static int interrupted_completion_wait(void *data)
+static void interrupted_completion_wait(void *data)
{
- RETURN(1);
}
+struct lock_wait_data {
+ struct ldlm_lock *lwd_lock;
+ int lwd_generation;
+};
+
int ldlm_expired_completion_wait(void *data)
{
- struct ldlm_lock *lock = data;
- struct ptlrpc_connection *conn;
- struct obd_device *obd;
+ struct lock_wait_data *lwd = data;
+ struct ldlm_lock *lock = lwd->lwd_lock;
+ struct obd_device *obd = class_conn2obd(lock->l_connh);
- if (!lock)
- CERROR("NULL lock\n");
- else if (!lock->l_connh)
- CERROR("lock %p has NULL connh\n", lock);
- else if (!(obd = class_conn2obd(lock->l_connh)))
- CERROR("lock %p has NULL obd\n", lock);
- else if (!(conn = obd->u.cli.cl_import.imp_connection))
- CERROR("lock %p has NULL connection\n", lock);
- else {
- LDLM_DEBUG(lock, "timed out waiting for completion");
- CERROR("lock %p timed out from %s\n", lock,
- conn->c_remote_uuid.uuid);
- ldlm_lock_dump(D_ERROR, lock);
- class_signal_connection_failure(conn);
+ if (obd == NULL) {
+ LDLM_ERROR(lock, "lock timed out; mot entering recovery in "
+ "server code, just going back to sleep");
+ } else {
+ struct obd_import *imp = obd->u.cli.cl_import;
+ ptlrpc_fail_import(imp, lwd->lwd_generation);
+ LDLM_ERROR(lock, "lock timed out, entering recovery for %s@%s",
+ imp->imp_target_uuid.uuid,
+ imp->imp_connection->c_remote_uuid.uuid);
}
+
RETURN(0);
}
int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
{
- struct l_wait_info lwi =
- LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
- interrupted_completion_wait, lock);
+ struct lock_wait_data lwd;
+ unsigned long irqflags;
+ struct obd_device *obd;
+ struct obd_import *imp = NULL;
int rc = 0;
+ struct l_wait_info lwi;
+
+ obd = class_conn2obd(lock->l_connh);
+
+ /* if this is a local lock, then there is no import */
+ if (obd != NULL)
+ imp = obd->u.cli.cl_import;
+
+ lwd.lwd_lock = lock;
+
+ lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
+ interrupted_completion_wait, &lwd);
ENTRY;
if (flags == LDLM_FL_WAIT_NOREPROC)
ldlm_reprocess_all(lock->l_resource);
noreproc:
+ if (imp != NULL) {
+ spin_lock_irqsave(&imp->imp_lock, irqflags);
+ lwd.lwd_generation = imp->imp_generation;
+ spin_unlock_irqrestore(&imp->imp_lock, irqflags);
+ }
+
/* Go to sleep until the lock is granted or cancelled. */
rc = l_wait_event(lock->l_waitq,
((lock->l_req_mode == lock->l_granted_mode) ||
ldlm_completion_callback completion,
ldlm_blocking_callback blocking,
void *data,
- void *cp_data,
struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
}
lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
- data, cp_data);
+ blocking, data);
if (!lock)
GOTO(out_nolock, err = -ENOMEM);
LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
ldlm_lock2handle(lock, lockh);
lock->l_flags |= LDLM_FL_LOCAL;
- err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags, completion,
- blocking);
+ err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
+ completion);
if (err != ELDLM_OK)
GOTO(out, err);
ldlm_completion_callback completion,
ldlm_blocking_callback blocking,
void *data,
- void *cp_data,
struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
type, cookie, cookielen, mode,
flags, completion, blocking, data,
- cp_data, lockh);
+ lockh);
RETURN(rc);
}
LASSERT(connh == lock->l_connh);
} else {
lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
- mode, data, cp_data);
+ mode, blocking, data);
if (lock == NULL)
GOTO(out_nolock, rc = -ENOMEM);
/* ugh. I set this early (instead of waiting for _enqueue)
LBUG();
/* Dump lock data into the request buffer */
- body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
ldlm_lock2desc(lock, &body->lock_desc);
body->lock_flags = *flags;
}
lock->l_connh = connh;
lock->l_export = NULL;
+ lock->l_blocking_ast = blocking;
LDLM_DEBUG(lock, "sending request");
rc = ptlrpc_queue_wait(req);
rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
/* Set a flag to prevent us from sending a CANCEL (bug 407) */
l_lock(&ns->ns_lock);
- lock->l_flags |= LDLM_FL_CANCELING;
+ lock->l_flags |= LDLM_FL_LOCAL_ONLY;
l_unlock(&ns->ns_lock);
ldlm_lock_decref_and_cancel(lockh, mode);
+
+ if (rc == ELDLM_LOCK_ABORTED) {
+ /* caller expects reply buffer 0 to have been swabbed */
+ reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+ lustre_swab_ldlm_reply);
+ if (reply == NULL) {
+ CERROR ("Can't unpack ldlm_reply\n");
+ GOTO (out_req, rc = -EPROTO);
+ }
+ }
GOTO(out_req, rc);
}
- reply = lustre_msg_buf(req->rq_repmsg, 0);
+ reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+ lustre_swab_ldlm_reply);
+ if (reply == NULL) {
+ CERROR ("Can't unpack ldlm_reply\n");
+ GOTO (out_req, rc = -EPROTO);
+ }
+
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
*flags = reply->lock_flags;
- CDEBUG(D_INFO, "local: %p, remote: %p, flags: %d\n", lock,
- (void *)(unsigned long)reply->lock_handle.addr, *flags);
+ CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: %d\n", lock,
+ reply->lock_handle.cookie, *flags);
if (type == LDLM_EXTENT) {
CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
"extent "LPU64" -> "LPU64"\n",
body->lock_desc.l_extent.start,
body->lock_desc.l_extent.end,
reply->lock_extent.start, reply->lock_extent.end);
+
+ if ((reply->lock_extent.end & ~PAGE_MASK) != ~PAGE_MASK) {
+ /* XXX Old versions of BA OST code have a fencepost bug
+ * which will cause them to grant a lock that's one
+ * byte too large. This can be safely removed after BA
+ * ships their next release -phik (02 Apr 2003) */
+ reply->lock_extent.end--;
+ } else if ((reply->lock_extent.start & ~PAGE_MASK) ==
+ ~PAGE_MASK) {
+ reply->lock_extent.start++;
+ }
+
cookie = &reply->lock_extent; /* FIXME bug 267 */
cookielen = sizeof(reply->lock_extent);
}
l_lock(&ns->ns_lock);
lock->l_completion_ast = NULL;
rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
- completion, blocking);
+ completion);
l_unlock(&ns->ns_lock);
if (lock->l_completion_ast)
lock->l_completion_ast(lock, *flags, NULL);
ldlm_completion_callback completion,
ldlm_blocking_callback blocking,
void *data,
- void *cp_data,
struct lustre_handle *lockh)
{
int rc;
rc = ldlm_cli_enqueue(connh, req, ns, parent_lock_handle,
res_id, type, cookie, cookielen, mode,
flags, completion, blocking, data,
- cp_data, lockh);
+ lockh);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_enqueue: err: %d\n", rc);
RETURN(rc);
ldlm_lock2handle(lock, &lockh);
return ldlm_cli_enqueue(lock->l_connh, NULL, NULL, NULL, junk,
lock->l_resource->lr_type, NULL, 0, -1, &flags,
- NULL, NULL, NULL, 0, &lockh);
+ NULL, NULL, NULL, &lockh);
}
static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
if (!req)
GOTO(out, rc = -ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
memcpy(&body->lock_handle1, &lock->l_remote_handle,
sizeof(body->lock_handle1));
if (rc != ELDLM_OK)
GOTO(out, rc);
- reply = lustre_msg_buf(req->rq_repmsg, 0);
+ reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+ lustre_swab_ldlm_reply);
+ if (reply == NULL) {
+ CERROR ("Can't unpack ldlm_reply\n");
+ GOTO (out, rc = -EPROTO);
+ }
+
res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
if (res != NULL)
ldlm_reprocess_all(res);
if (lock->l_connh) {
int local_only;
+ struct obd_import *imp;
LDLM_DEBUG(lock, "client-side cancel");
/* Set this flag to prevent others from getting new references*/
l_lock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_flags |= LDLM_FL_CBPENDING;
- ldlm_cancel_callback(lock);
local_only = (lock->l_flags & LDLM_FL_LOCAL_ONLY);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ ldlm_cancel_callback(lock);
if (local_only) {
CDEBUG(D_INFO, "not sending request (at caller's "
- "instruction\n");
+ "instruction)\n");
+ goto local_cancel;
+ }
+
+ imp = class_conn2cliimp(lock->l_connh);
+ if (imp == NULL || imp->imp_invalid) {
+ CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
+ imp);
goto local_cancel;
}
- req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh),
- LDLM_CANCEL, 1, &size, NULL);
+ req = ptlrpc_prep_req(imp, LDLM_CANCEL, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
- body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
memcpy(&body->lock_handle1, &lock->l_remote_handle,
sizeof(body->lock_handle1));
req->rq_replen = lustre_msg_size(0, NULL);
rc = ptlrpc_queue_wait(req);
- ptlrpc_req_finished(req);
- if (rc == ESTALE) {
- CERROR("client/server out of sync\n");
- LBUG();
- }
- if (rc != ELDLM_OK)
+
+ if (rc == ESTALE)
+ CERROR("client/server (nid "LPU64") out of sync--not "
+ "fatal\n",
+ req->rq_import->imp_connection->c_peer.peer_nid);
+ else if (rc != ELDLM_OK)
CERROR("Got rc %d from cancel RPC: canceling "
"anyway\n", rc);
+
+ ptlrpc_req_finished(req);
local_cancel:
ldlm_lock_cancel(lock);
} else {
RETURN(rc);
}
-int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
- struct ldlm_res_id res_id, int flags)
+static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
+ struct ldlm_res_id res_id, int flags,
+ void *opaque)
{
struct ldlm_resource *res;
struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- if (lock->l_readers || lock->l_writers)
- continue;
+ if (lock->l_readers || lock->l_writers) {
+ if (flags & LDLM_FL_WARN) {
+ LDLM_ERROR(lock, "lock in use");
+ LBUG();
+ }
+ }
+ if (opaque != NULL && lock->l_data != opaque) {
+ LDLM_ERROR(lock, "data %p doesn't match opaque %p",
+ lock->l_data, opaque);
+ LBUG();
+ }
/* See CBPENDING comment in ldlm_cancel_lru */
lock->l_flags |= LDLM_FL_CBPENDING;
*
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
* to notify the server.
- * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback. */
+ * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
+ * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
- struct ldlm_res_id *res_id, int flags)
+ struct ldlm_res_id *res_id, int flags, void *opaque)
{
int i;
ENTRY;
RETURN(ELDLM_OK);
if (res_id)
- RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags));
+ RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
+ opaque));
l_lock(&ns->ns_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
ldlm_resource_getref(res);
rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
- flags);
+ flags, opaque);
if (rc)
CERROR("cancel_unused_res ("LPU64"): %d\n",
/* We're part of recovery, so don't wait for it. */
req->rq_level = LUSTRE_CONN_RECOVD;
- body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
ldlm_lock2desc(lock, &body->lock_desc);
body->lock_flags = flags;
rc = ptlrpc_queue_wait(req);
if (rc != ELDLM_OK)
GOTO(out, rc);
-
- reply = lustre_msg_buf(req->rq_repmsg, 0);
+
+ reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+ lustre_swab_ldlm_reply);
+ if (reply == NULL) {
+ CERROR("Can't unpack ldlm_reply\n");
+ GOTO (out, rc = -EPROTO);
+ }
+
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
LDLM_DEBUG(lock, "replayed lock:");