#define LDLM_NAMESPACE_SERVER 0
#define LDLM_NAMESPACE_CLIENT 1
-#define LDLM_FL_LOCK_CHANGED (1 << 0) /* extent, mode, or resource changed */
+#define LDLM_FL_LOCK_CHANGED 0x000001 /* extent, mode, or resource changed */
/* If the server returns one of these flags, then the lock was put on that list.
* If the client sends one of these flags (during recovery ONLY!), it wants the
* lock added to the specified list, no questions asked. -p */
-#define LDLM_FL_BLOCK_GRANTED (1 << 1)
-#define LDLM_FL_BLOCK_CONV (1 << 2)
-#define LDLM_FL_BLOCK_WAIT (1 << 3)
+#define LDLM_FL_BLOCK_GRANTED 0x000002
+#define LDLM_FL_BLOCK_CONV 0x000004
+#define LDLM_FL_BLOCK_WAIT 0x000008
-#define LDLM_FL_CBPENDING (1 << 4) // this lock is being destroyed
-#define LDLM_FL_AST_SENT (1 << 5) // blocking or cancel packet was sent
-#define LDLM_FL_WAIT_NOREPROC (1 << 6)// not a real lock flag,not saved in lock
-#define LDLM_FL_CANCEL (1 << 7) // cancellation callback already run
+#define LDLM_FL_CBPENDING 0x000010 /* this lock is being destroyed */
+#define LDLM_FL_AST_SENT 0x000020 /* blocking or cancel packet was sent */
+#define LDLM_FL_WAIT_NOREPROC 0x000040 /* not a real flag, not saved in lock */
+#define LDLM_FL_CANCEL 0x000080 /* cancellation callback already run */
/* Lock is being replayed. This could probably be implied by the fact that one
* of BLOCK_{GRANTED,CONV,WAIT} is set, but that is pretty dangerous. */
-#define LDLM_FL_REPLAY (1 << 8)
+#define LDLM_FL_REPLAY 0x000100
-#define LDLM_FL_INTENT_ONLY (1 << 9) /* don't grant lock, just do intent */
-#define LDLM_FL_LOCAL_ONLY (1 << 10) /* see ldlm_cli_cancel_unused */
+#define LDLM_FL_INTENT_ONLY 0x000200 /* don't grant lock, just do intent */
+#define LDLM_FL_LOCAL_ONLY 0x000400 /* see ldlm_cli_cancel_unused */
/* don't run the cancel callback under ldlm_cli_cancel_unused */
-#define LDLM_FL_NO_CALLBACK (1 << 11)
+#define LDLM_FL_NO_CALLBACK 0x000800
-#define LDLM_FL_HAS_INTENT (1 << 12) /* lock request has intent */
-#define LDLM_FL_CANCELING (1 << 13) /* lock cancel has already been sent */
-#define LDLM_FL_LOCAL (1 << 14) // a local lock (ie, no srv/cli split)
-#define LDLM_FL_WARN (1 << 15) /* see ldlm_cli_cancel_unused */
-#define LDLM_FL_MATCH_DATA (1 << 16) /* see ldlm_lock_match */
+#define LDLM_FL_HAS_INTENT 0x001000 /* lock request has intent */
+#define LDLM_FL_CANCELING 0x002000 /* lock cancel has already been sent */
+#define LDLM_FL_LOCAL 0x004000 /* local lock (ie, no srv/cli split) */
+#define LDLM_FL_WARN 0x008000 /* see ldlm_cli_cancel_unused */
+#define LDLM_FL_DISCARD_DATA 0x010000 /* discard (no writeback) on cancel */
+#define LDLM_FL_MATCH_DATA 0x020000 /* see ldlm_lock_match */
+
+/* These are flags that are mapped into the flags and ASTs of blocking locks */
+#define LDLM_AST_DISCARD_DATA 0x80000000 /* Add FL_DISCARD to blocking ASTs */
+
+/* Flags sent in AST lock_flags to be mapped into the receiving lock. */
+#define LDLM_AST_FLAGS (LDLM_FL_DISCARD_DATA)
/* The blocking callback is overloaded to perform two functions. These flags
* indicate which operation should be performed. */
if (lock->l_resource == NULL) { \
CDEBUG(level, "### " format \
" ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "\
- "res: \?\? rrc=\?\? type: \?\?\? remote: " \
+ "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: " \
LPX64"\n" , ## a, lock, lock->l_handle.h_cookie, \
atomic_read(&lock->l_refc), \
lock->l_readers, lock->l_writers, \
ldlm_lockname[lock->l_granted_mode], \
ldlm_lockname[lock->l_req_mode], \
- lock->l_remote_handle.cookie); \
+ lock->l_flags, lock->l_remote_handle.cookie); \
break; \
} \
if (lock->l_resource->lr_type == LDLM_EXTENT) { \
CDEBUG(level, "### " format \
" ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \
"res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64\
- "] remote: "LPX64"\n" , ## a, \
+ "] flags: %x remote: "LPX64"\n" , ## a, \
lock->l_resource->lr_namespace->ns_name, lock, \
lock->l_handle.h_cookie, atomic_read(&lock->l_refc), \
lock->l_readers, lock->l_writers, \
atomic_read(&lock->l_resource->lr_refcount), \
ldlm_typename[lock->l_resource->lr_type], \
lock->l_extent.start, lock->l_extent.end, \
- lock->l_remote_handle.cookie); \
+ lock->l_flags, lock->l_remote_handle.cookie); \
break; \
} \
{ \
CDEBUG(level, "### " format \
" ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \
- "res: "LPU64"/"LPU64" rrc: %d type: %s remote: "LPX64 \
- "\n" , ## a, \
+ "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x " \
+ "remote: "LPX64"\n" , ## a, \
lock->l_resource->lr_namespace->ns_name, \
lock, lock->l_handle.h_cookie, \
atomic_read (&lock->l_refc), \
lock->l_resource->lr_name.name[1], \
atomic_read(&lock->l_resource->lr_refcount), \
ldlm_typename[lock->l_resource->lr_type], \
- lock->l_remote_handle.cookie); \
+ lock->l_flags, lock->l_remote_handle.cookie); \
} \
} while (0)
*/
#define LDLM_ITER_CONTINUE 1 /* keep iterating */
-#define LDLM_ITER_STOP 0 /* stop iterating */
+#define LDLM_ITER_STOP 2 /* stop iterating */
typedef int (*ldlm_iterator_t)(struct ldlm_lock *, void *);
typedef int (*ldlm_res_iterator_t)(struct ldlm_resource *, void *);
void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
-void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen);
int ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct ldlm_res_id *,
__u32 type, void *cookie, int cookielen, ldlm_mode_t mode,
void *data, struct lustre_handle *);
-struct ldlm_lock *
-ldlm_lock_create(struct ldlm_namespace *ns,
- struct lustre_handle *parent_lock_handle, struct ldlm_res_id,
- __u32 type, ldlm_mode_t, ldlm_blocking_callback,
- void *data);
-ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
- void *cookie, int cookie_len, int *flags,
- ldlm_completion_callback completion);
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
int *flags);
void ldlm_lock_cancel(struct ldlm_lock *lock);
#endif
#include <linux/lustre_dlm.h>
-#include <linux/obd_class.h>
#include <linux/lustre_lib.h>
/* invariants:
RETURN(1);
}
+#include "ldlm_internal.h"
+
/* The purpose of this function is to return:
* - the maximum extent
* - containing the requested extent
+/* ldlm_request.c */
int ldlm_cancel_lru(struct ldlm_namespace *ns);
+
+/* ldlm_lock.c */
+void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
+ int run_ast);
+struct ldlm_lock *
+ldlm_lock_create(struct ldlm_namespace *ns,
+ struct lustre_handle *parent_lock_handle, struct ldlm_res_id,
+ __u32 type, ldlm_mode_t, ldlm_blocking_callback,
+ ldlm_completion_callback, void *data);
+ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
+ void *cookie, int cookie_len, int *flags);
# include <linux/slab.h>
# include <linux/module.h>
# include <linux/lustre_dlm.h>
-# include <linux/lustre_mds.h>
#else
# include <liblustre.h>
# include <linux/kp30.h>
if (new && (lock->l_flags & LDLM_FL_AST_SENT))
GOTO(out, 0);
+ CDEBUG(D_OTHER, "lock %p incompatible; sending blocking AST.\n", lock);
+
OBD_ALLOC(w, sizeof(*w));
if (!w) {
LBUG();
rc = 0;
- if (send_cbs && child->l_blocking_ast != NULL) {
- CDEBUG(D_OTHER, "lock %p incompatible; sending "
- "blocking AST.\n", child);
+ if (send_cbs && child->l_blocking_ast != NULL)
ldlm_add_ast_work_item(child, lock, NULL, 0);
- }
}
return rc;
* - ldlm_reprocess_queue
* - ldlm_lock_convert
*/
-void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen)
+void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
+ int run_ast)
{
struct ldlm_resource *res = lock->l_resource;
ENTRY;
if (lock->l_granted_mode < res->lr_most_restr)
res->lr_most_restr = lock->l_granted_mode;
- if (lock->l_completion_ast != NULL)
+ if (run_ast && lock->l_completion_ast != NULL)
ldlm_add_ast_work_item(lock, NULL, data, datalen);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
struct ldlm_res_id res_id, __u32 type,
ldlm_mode_t mode,
ldlm_blocking_callback blocking,
+ ldlm_completion_callback completion,
void *data)
{
struct ldlm_resource *res, *parent_res = NULL;
lock->l_req_mode = mode;
lock->l_data = data;
lock->l_blocking_ast = blocking;
+ lock->l_completion_ast = completion;
RETURN(lock);
}
ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
struct ldlm_lock **lockp,
- void *cookie, int cookie_len,
- int *flags,
- ldlm_completion_callback completion)
+ void *cookie, int cookie_len, int *flags)
{
struct ldlm_resource *res;
struct ldlm_lock *lock = *lockp;
else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
else
- ldlm_grant_lock(lock, NULL, 0);
+ ldlm_grant_lock(lock, NULL, 0, 0);
GOTO(out, ELDLM_OK);
} else if (*flags & LDLM_FL_REPLAY) {
if (*flags & LDLM_FL_BLOCK_CONV) {
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
GOTO(out, ELDLM_OK);
} else if (*flags & LDLM_FL_BLOCK_GRANTED) {
- ldlm_grant_lock(lock, NULL, 0);
+ ldlm_grant_lock(lock, NULL, 0, 0);
GOTO(out, ELDLM_OK);
}
/* If no flags, fall through to normal enqueue path. */
*flags |= LDLM_FL_BLOCK_GRANTED;
GOTO(out, ELDLM_OK);
}
- ldlm_grant_lock(lock, NULL, 0);
+ ldlm_grant_lock(lock, NULL, 0, 0);
EXIT;
out:
- /* Don't set 'completion_ast' until here so that if the lock is granted
- * immediately we don't do an unnecessary completion call. */
- lock->l_completion_ast = completion;
l_unlock(&ns->ns_lock);
return ELDLM_OK;
}
RETURN(1);
list_del_init(&pending->l_res_link);
- ldlm_grant_lock(pending, NULL, 0);
+ ldlm_grant_lock(pending, NULL, 0, 1);
}
RETURN(0);
struct ldlm_namespace *ns;
ENTRY;
+ /* There's no race between calling this and taking the ns lock below;
+ * a lock can only be put on the waiting list once, because it can only
+ * issue a blocking AST once. */
ldlm_del_waiting_lock(lock);
res = lock->l_resource;
LBUG();
res->lr_tmp = &rpc_list;
- ldlm_grant_lock(lock, NULL, 0);
+ ldlm_grant_lock(lock, NULL, 0, 0);
res->lr_tmp = NULL;
granted = 1;
/* FIXME: completion handling not with ns_lock held ! */
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define EXPORT_SYMTAB
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
#define DEBUG_SUBSYSTEM S_LDLM
#ifdef __KERNEL__
#include <linux/lustre_dlm.h>
#include <linux/obd_class.h>
+#include "ldlm_internal.h"
+
extern kmem_cache_t *ldlm_resource_slab;
extern kmem_cache_t *ldlm_lock_slab;
extern struct lustre_lock ldlm_handle_lock;
dlm_req->lock_desc.l_resource.lr_name,
dlm_req->lock_desc.l_resource.lr_type,
dlm_req->lock_desc.l_req_mode,
- blocking_callback, NULL);
+ blocking_callback, completion_callback, NULL);
if (!lock)
GOTO(out, err = -ENOMEM);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
- &flags, completion_callback);
+ &flags);
if (err)
GOTO(out, err);
LDLM_DEBUG(lock, "completion AST, new resource");
}
lock->l_resource->lr_tmp = &ast_list;
- ldlm_grant_lock(lock, req, sizeof(*req));
+ ldlm_grant_lock(lock, req, sizeof(*req), 1);
lock->l_resource->lr_tmp = NULL;
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
l_unlock(&ns->ns_lock);
#include <linux/obd_class.h>
#include <linux/obd.h>
+#include "ldlm_internal.h"
+
static void interrupted_completion_wait(void *data)
{
}
}
lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
- blocking, data);
+ blocking, completion, data);
if (!lock)
GOTO(out_nolock, err = -ENOMEM);
LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
ldlm_lock2handle(lock, lockh);
lock->l_flags |= LDLM_FL_LOCAL;
- err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
- completion);
+ err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
if (err != ELDLM_OK)
GOTO(out, err);
LASSERT(connh == lock->l_connh);
} else {
lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
- mode, blocking, data);
+ mode, blocking, completion, data);
if (lock == NULL)
GOTO(out_nolock, rc = -ENOMEM);
- /* ugh. I set this early (instead of waiting for _enqueue)
- * because the completion AST might arrive early, and we need
- * (in just this one case) to run the completion_cb even if it
- * arrives before the reply. */
- lock->l_completion_ast = completion;
LDLM_DEBUG(lock, "client-side enqueue START");
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, mode);
}
if (!is_replay) {
- l_lock(&ns->ns_lock);
- lock->l_completion_ast = NULL;
- rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
- completion);
- l_unlock(&ns->ns_lock);
+ rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
if (lock->l_completion_ast)
lock->l_completion_ast(lock, *flags, NULL);
}
void *data)
{
struct ldlm_resource *res;
- int rc = 0;
ENTRY;
if (ns == NULL) {
}
l_lock(&ns->ns_lock);
- rc = ldlm_resource_foreach(res, iter, data);
+ ldlm_resource_foreach(res, iter, data);
l_unlock(&ns->ns_lock);
ldlm_resource_putref(res);
EXIT;