struct obd_device *obd;
if (lock->l_conn_export == NULL) {
- static unsigned long next_dump = 0;
+ static unsigned long next_dump = 0, last_dump = 0;
LDLM_ERROR(lock, "lock timed out; not entering recovery in "
"server code, just going back to sleep");
if (time_after(jiffies, next_dump)) {
- ldlm_namespace_dump(lock->l_resource->lr_namespace);
- if (next_dump == 0)
- portals_debug_dumplog();
+ last_dump = next_dump;
next_dump = jiffies + 300 * HZ;
+ ldlm_namespace_dump(D_DLMTRACE,
+ lock->l_resource->lr_namespace);
+ if (last_dump == 0)
+ portals_debug_dumplog();
}
RETURN(0);
}
l_unlock(&ns->ns_lock);
ldlm_lock_decref_and_cancel(lockh, mode);
+
+ /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
+ * from llite/file.c/ll_file_flock(). */
+ if (lock->l_resource->lr_type == LDLM_FLOCK) {
+ ldlm_lock_destroy(lock);
+ }
}
int ldlm_cli_enqueue(struct obd_export *exp,
GOTO(cleanup, rc = -EPROTO);
}
- /* XXX - Phil, wasn't sure if this should go before or after the
- * lustre_swab_repbuf() ? If we can't unpack the reply then we
- * don't know what occurred on the server so I think the safest
- * bet is to cleanup the lock as if it didn't make it ? */
-
/* lock enqueued on the server */
cleanup_phase = 1;
rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
if (lock->l_completion_ast != NULL) {
int err = lock->l_completion_ast(lock, *flags, NULL);
- if (!rc) {
- cleanup_phase = 2;
+ if (!rc)
rc = err;
- }
+ if (rc)
+ cleanup_phase = 2;
}
}
static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
int *flags)
{
+ struct ldlm_resource *res;
+ int rc;
ENTRY;
if (lock->l_resource->lr_namespace->ns_client) {
CERROR("Trying to cancel local lock\n");
}
LDLM_DEBUG(lock, "client-side local convert");
- ldlm_lock_convert(lock, new_mode, flags);
- ldlm_reprocess_all(lock->l_resource);
-
+ res = ldlm_lock_convert(lock, new_mode, flags);
+ if (res) {
+ ldlm_reprocess_all(res);
+ rc = 0;
+ } else {
+ rc = EDEADLOCK;
+ }
LDLM_DEBUG(lock, "client-side local convert handler END");
LDLM_LOCK_PUT(lock);
- RETURN(0);
+ RETURN(rc);
}
/* FIXME: one of ldlm_cli_convert or the server side should reject attempted
* conversion of locks which are on the waiting or converting queue */
+/* Caller of this code is supposed to take care of lock readers/writers
+ accounting */
int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
{
struct ldlm_request *body;
struct ldlm_reply *reply;
struct ldlm_lock *lock;
struct ldlm_resource *res;
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
int rc, size = sizeof(*body);
ENTRY;
GOTO (out, rc = -EPROTO);
}
+ if (req->rq_status)
+ GOTO(out, rc = req->rq_status);
+
res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
- if (res != NULL)
+ if (res != NULL) {
ldlm_reprocess_all(res);
- /* Go to sleep until the lock is granted. */
- /* FIXME: or cancelled. */
- if (lock->l_completion_ast)
- lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, NULL);
+ /* Go to sleep until the lock is granted. */
+ /* FIXME: or cancelled. */
+ if (lock->l_completion_ast) {
+ rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
+ NULL);
+ if (rc)
+ GOTO(out, rc);
+ }
+ } else {
+ rc = EDEADLOCK;
+ }
EXIT;
out:
LDLM_LOCK_PUT(lock);
/* Set this flag to prevent others from getting new references*/
l_lock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_flags |= LDLM_FL_CBPENDING;
- local_only = (lock->l_flags & LDLM_FL_LOCAL_ONLY);
+ local_only = lock->l_flags & LDLM_FL_LOCAL_ONLY;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_cancel_callback(lock);
ENTRY;
if (ns == NULL) {
- CERROR("must pass in namespace");
+ CERROR("must pass in namespace\n");
LBUG();
}