#define DEBUG_SUBSYSTEM S_LDLM
+#ifdef __KERNEL__
#include <linux/slab.h>
#include <linux/module.h>
#include <linux/lustre_dlm.h>
#include <linux/lustre_mds.h>
+#else
+#include <liblustre.h>
+#include <linux/kp30.h>
+#endif
+
#include <linux/obd_class.h>
//struct lustre_lock ldlm_everything_lock;
[LDLM_EXTENT] "EXT",
};
+#ifdef __KERNEL__
char *ldlm_it2str(int it)
{
switch (it) {
return "readdir";
case IT_GETATTR:
return "getattr";
- case IT_TRUNC:
- return "truncate";
- case IT_SETATTR:
- return "setattr";
case IT_LOOKUP:
return "lookup";
case IT_UNLINK:
return "UNKNOWN";
}
}
+#endif
extern kmem_cache_t *ldlm_lock_slab;
struct lustre_lock ldlm_handle_lock;
static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
-ldlm_res_compat ldlm_res_compat_table[] = {
+static ldlm_res_compat ldlm_res_compat_table[] = {
[LDLM_PLAIN] ldlm_plain_compat,
[LDLM_EXTENT] ldlm_extent_compat,
};
return ELDLM_OK;
}
-ldlm_res_policy ldlm_res_policy_table[] = {
+static ldlm_res_policy ldlm_res_policy_table[] = {
[LDLM_PLAIN] ldlm_plain_policy,
[LDLM_EXTENT] ldlm_extent_policy,
};
if (atomic_dec_and_test(&lock->l_refc)) {
l_lock(&ns->ns_lock);
- LDLM_DEBUG0(lock, "final lock_put on destroyed lock, freeing");
+ LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
LASSERT(lock->l_destroyed);
LASSERT(list_empty(&lock->l_res_link));
list_del_init(&lock->l_export_chain);
ldlm_lock_remove_from_lru(lock);
- portals_handle_unhash(&lock->l_handle);
+ class_handle_unhash(&lock->l_handle);
#if 0
/* Wake anyone waiting for this lock */
}
INIT_LIST_HEAD(&lock->l_handle.h_link);
- portals_handle_hash(&lock->l_handle, lock_handle_addref);
+ class_handle_hash(&lock->l_handle, lock_handle_addref);
RETURN(lock);
}
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
{
- memset(&lockh->addr, 0x69, sizeof(lockh->addr));
+ POISON(&lockh->addr, 0x69, sizeof(lockh->addr));
lockh->cookie = lock->l_handle.h_cookie;
}
LASSERT(handle);
- lock = portals_handle2object(handle->cookie);
+ lock = class_handle2object(handle->cookie);
if (lock == NULL)
RETURN(NULL);
}
static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
- struct ldlm_lock *new,
+ struct ldlm_lock *new,
void *data, int datalen)
{
struct ldlm_ast_work *w;
CERROR("FL_CBPENDING set on non-local lock--just a "
"warning\n");
- LDLM_DEBUG0(lock, "final decref done on cbpending lock");
+ LDLM_DEBUG(lock, "final decref done on cbpending lock");
+
+ if (lock->l_blocking_ast == NULL) {
+ /* The lock wasn't even fully formed; just destroy it */
+ ldlm_lock_destroy(lock);
+ }
l_unlock(&ns->ns_lock);
/* FIXME: need a real 'desc' here */
- lock->l_blocking_ast(lock, NULL, lock->l_data,
- LDLM_CB_BLOCKING);
+ if (lock->l_blocking_ast != NULL)
+ lock->l_blocking_ast(lock, NULL, lock->l_data,
+ LDLM_CB_BLOCKING);
} else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
/* If this is a client-side namespace and this was the last
* reference, put it on the LRU. */
lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, NULL);
}
if (rc)
- LDLM_DEBUG0(lock, "matched");
+ LDLM_DEBUG(lock, "matched");
else
LDLM_DEBUG_NOLOCK("not matched");
ldlm_grant_lock(lock, NULL, 0);
EXIT;
out:
- l_unlock(&ns->ns_lock);
/* Don't set 'completion_ast' until here so that if the lock is granted
* immediately we don't do an unnecessary completion call. */
lock->l_completion_ast = completion;
+ l_unlock(&ns->ns_lock);
return ELDLM_OK;
}
struct ldlm_ast_work *w =
list_entry(tmp, struct ldlm_ast_work, w_list);
- if (w->w_blocking)
+ /* It's possible to receive a completion AST before we've set
+ * the l_completion_ast pointer: either because the AST arrived
+ * before the reply, or simply because there's a small race
+ * window between receiving the reply and finishing the local
+ * enqueue. (bug 842)
+ *
+ * This can't happen with the blocking_ast, however, because we
+ * will never call the local blocking_ast until we drop our
+ * reader/writer reference, which we won't do until we get the
+ * reply and finish enqueueing. */
+ if (w->w_blocking) {
+ LASSERT(w->w_lock->l_blocking_ast != NULL);
rc = w->w_lock->l_blocking_ast
(w->w_lock, &w->w_desc, w->w_data,
LDLM_CB_BLOCKING);
- else
+ } else if (w->w_lock->l_completion_ast != NULL) {
rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags,
w->w_data);
+ } else {
+ rc = 0;
+ }
if (rc == -ERESTART)
retval = rc;
else if (rc)
lock->l_blocking_ast(lock, NULL, lock->l_data,
LDLM_CB_CANCELING);
else
- LDLM_DEBUG0(lock, "no blocking ast");
+ LDLM_DEBUG(lock, "no blocking ast");
}
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
}
/* Please do not, no matter how tempting, remove this LBUG without
* talking to me first. -phik */
if (lock->l_readers || lock->l_writers) {
- LDLM_DEBUG0(lock, "lock still has references");
+ LDLM_DEBUG(lock, "lock still has references");
ldlm_lock_dump(D_OTHER, lock);
LBUG();
}
CDEBUG(level, " -- Lock dump: %p (%s) (rc: %d)\n", lock, ver,
atomic_read(&lock->l_refc));
if (lock->l_export && lock->l_export->exp_connection)
- CDEBUG(level, " Node: NID %x (rhandle: "LPX64")\n",
+ CDEBUG(level, " Node: NID "LPX64" on %s (rhandle: "LPX64")\n",
lock->l_export->exp_connection->c_peer.peer_nid,
+ lock->l_export->exp_connection->c_peer.peer_ni->pni_name,
lock->l_remote_handle.cookie);
else
CDEBUG(level, " Node: local\n");