#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
-#include <linux/slab.h>
-#include <linux/module.h>
-#include <linux/lustre_dlm.h>
-#include <linux/lustre_mds.h>
+# include <linux/slab.h>
+# include <linux/module.h>
+# include <linux/lustre_dlm.h>
+# include <linux/lustre_mds.h>
#else
-#include <liblustre.h>
-#include <linux/kp30.h>
+# include <liblustre.h>
+# include <linux/kp30.h>
#endif
#include <linux/obd_class.h>
+#include "ldlm_internal.h"
//struct lustre_lock ldlm_everything_lock;
if (lock->l_parent)
LDLM_LOCK_PUT(lock->l_parent);
- PORTAL_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
+ OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
l_unlock(&ns->ns_lock);
}
if (resource == NULL)
LBUG();
- PORTAL_SLAB_ALLOC(lock, ldlm_lock_slab, sizeof(*lock));
+ OBD_SLAB_ALLOC(lock, ldlm_lock_slab, SLAB_KERNEL, sizeof(*lock));
if (lock == NULL)
RETURN(NULL);
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
{
- POISON(&lockh->addr, 0x69, sizeof(lockh->addr));
lockh->cookie = lock->l_handle.h_cookie;
}
LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
}
-/* Args: unlocked lock */
-int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
- struct ldlm_res_id, int flags);
-
void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
{
struct ldlm_namespace *ns;
"warning\n");
LDLM_DEBUG(lock, "final decref done on cbpending lock");
-
- if (lock->l_blocking_ast == NULL) {
- /* The lock wasn't even fully formed; just destroy it */
- ldlm_lock_destroy(lock);
- }
l_unlock(&ns->ns_lock);
/* FIXME: need a real 'desc' here */
if (lock->l_blocking_ast != NULL)
lock->l_blocking_ast(lock, NULL, lock->l_data,
LDLM_CB_BLOCKING);
+ else
+ LDLM_DEBUG(lock, "No blocking AST?");
} else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
/* If this is a client-side namespace and this was the last
* reference, put it on the LRU. */
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_flags |= LDLM_FL_CBPENDING;
- ldlm_lock_decref_internal(lock, mode);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ ldlm_lock_decref_internal(lock, mode);
LDLM_LOCK_PUT(lock);
}
if (lock == old_lock)
break;
- if (lock->l_flags & LDLM_FL_CBPENDING)
+ /* llite sometimes wants to match locks that will be
+ * canceled when their users drop, but we allow it to match
+ * if it passes in CBPENDING and the lock still has users.
+ * this is generally only going to be used by children
+ * whose parents already hold a lock so forward progress
+ * can still happen. */
+ if (lock->l_flags & LDLM_FL_CBPENDING &&
+ !(flags & LDLM_FL_CBPENDING))
+ continue;
+ if (lock->l_flags & LDLM_FL_CBPENDING &&
+ lock->l_readers == 0 && lock->l_writers == 0)
continue;
if (lock->l_req_mode != mode)
* server (ie, connh is NULL)
* If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
* list will be considered
+ * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
+ * to be canceled can still be matched as long as they still have reader
+ * or writer refernces
*
* Returns 1 if it finds an already-existing lock that is compatible; in this
* case, lockh is filled in with a addref()ed lock
GOTO(out, rc = 1);
EXIT;
- out:
+ out:
ldlm_resource_putref(res);
l_unlock(&ns->ns_lock);
if (lock) {
ldlm_lock2handle(lock, lockh);
if (lock->l_completion_ast)
- lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, NULL);
+ lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
+ NULL);
}
if (rc)
LDLM_DEBUG(lock, "matched");
struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
struct ldlm_res_id res_id, __u32 type,
- ldlm_mode_t mode, void *data, void *cp_data)
+ ldlm_mode_t mode,
+ ldlm_blocking_callback blocking,
+ void *data)
{
struct ldlm_resource *res, *parent_res = NULL;
struct ldlm_lock *lock, *parent_lock = NULL;
lock->l_req_mode = mode;
lock->l_data = data;
- lock->l_cp_data = cp_data;
+ lock->l_blocking_ast = blocking;
RETURN(lock);
}
struct ldlm_lock **lockp,
void *cookie, int cookie_len,
int *flags,
- ldlm_completion_callback completion,
- ldlm_blocking_callback blocking)
+ ldlm_completion_callback completion)
{
struct ldlm_resource *res;
struct ldlm_lock *lock = *lockp;
ENTRY;
res = lock->l_resource;
- lock->l_blocking_ast = blocking;
if (res->lr_type == LDLM_EXTENT)
memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
*flags |= LDLM_FL_BLOCK_GRANTED;
GOTO(out, ELDLM_OK);
}
-
- if (lock->l_granted_cb != NULL && lock->l_data != NULL) {
- /* We just -know- */
- struct ptlrpc_request *req = lock->l_data;
- lock->l_granted_cb(lock, req->rq_repmsg, 0);
- }
ldlm_grant_lock(lock, NULL, 0);
EXIT;
out:
l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (!(lock->l_flags & LDLM_FL_CANCEL)) {
lock->l_flags |= LDLM_FL_CANCEL;
- if (lock->l_blocking_ast)
+ if (lock->l_blocking_ast) {
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_blocking_ast(lock, NULL, lock->l_data,
LDLM_CB_CANCELING);
- else
+ return;
+ } else {
LDLM_DEBUG(lock, "no blocking ast");
+ }
}
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
}
LBUG();
}
- ldlm_cancel_callback(lock);
+ ldlm_cancel_callback(lock); /* XXX FIXME bug 1030 */
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy(lock);
EXIT;
}
-int ldlm_lock_set_data(struct lustre_handle *lockh, void *data, void *cp_data)
+int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
{
struct ldlm_lock *lock = ldlm_handle2lock(lockh);
ENTRY;
RETURN(-EINVAL);
lock->l_data = data;
- lock->l_cp_data = cp_data;
LDLM_LOCK_PUT(lock);
void ldlm_lock_dump(int level, struct ldlm_lock *lock)
{
char ver[128];
+ struct obd_device *obd;
if (!((portal_debug | D_ERROR) & level))
return;
CDEBUG(level, " -- Lock dump: %p (%s) (rc: %d)\n", lock, ver,
atomic_read(&lock->l_refc));
- if (lock->l_export && lock->l_export->exp_connection)
+ obd = class_conn2obd(lock->l_connh);
+ if (lock->l_export && lock->l_export->exp_connection) {
CDEBUG(level, " Node: NID "LPX64" on %s (rhandle: "LPX64")\n",
lock->l_export->exp_connection->c_peer.peer_nid,
lock->l_export->exp_connection->c_peer.peer_ni->pni_name,
lock->l_remote_handle.cookie);
- else
+ } else if (obd == NULL) {
CDEBUG(level, " Node: local\n");
+ } else {
+ struct obd_import *imp = obd->u.cli.cl_import;
+ CDEBUG(level, " Node: NID "LPX64" on %s (rhandle: "LPX64")\n",
+ imp->imp_connection->c_peer.peer_nid,
+ imp->imp_connection->c_peer.peer_ni->pni_name,
+ lock->l_remote_handle.cookie);
+ }
CDEBUG(level, " Parent: %p\n", lock->l_parent);
CDEBUG(level, " Resource: %p ("LPD64")\n", lock->l_resource,
lock->l_resource->lr_name.name[0]);