#include <linux/lustre_lib.h>
#include <linux/lustre_net.h>
#include <linux/lustre_idl.h>
+#include <linux/lustre_dlm.h>
#include <linux/obd_class.h>
#include <linux/lustre_log.h>
#include <linux/lustre_cmobd.h>
}
/* XXX layering violation! -phil */
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
+
/* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
* such that filter_blocking_ast is called just before l_i_p takes the
* ns_lock, then by the time we get the lock, we might not be the
* correct blocking function anymore. So check, and return early, if
* so. */
if (lock->l_blocking_ast != cache_blocking_ast) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
RETURN(0);
}
lock->l_flags |= LDLM_FL_CBPENDING;
do_ast = (!lock->l_readers && !lock->l_writers);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
if (do_ast) {
struct lustre_handle lockh;
* list. */
#define LDLM_FL_KMS_IGNORE 0x200000
+/* completion ast to be executed */
+#define LDLM_FL_CP_REQD 0x400000
+
+/* cleanup_resource has already handled the lock */
+#define LDLM_FL_CLEANED 0x800000
+
+/* optimization hint: LDLM can run blocking callback from current context
+ * w/o involving separate thread. in order to decrease cs rate -bzzz */
+#define LDLM_FL_ATOMIC_CB 0x1000000
+
+
/* The blocking callback is overloaded to perform two functions. These flags
* indicate which operation should be performed. */
#define LDLM_CB_BLOCKING 1
-
*/
+/*
+ * Locking rules:
+ *
+ * lr_lock
+ *
+ * lr_lock
+ * waiting_locks_spinlock
+ *
+ * lr_lock
+ * led_lock
+ *
+ * lr_lock
+ * ns_unused_lock
+ *
+ * lr_lvb_sem
+ * lr_lock
+ *
+ */
+
struct ldlm_lock;
struct ldlm_resource;
struct ldlm_namespace;
char *ns_name;
__u32 ns_client; /* is this a client-side lock tree? */
struct list_head *ns_hash; /* hash table for ns */
+ spinlock_t ns_hash_lock;
__u32 ns_refcount; /* count of resources in the hash */
struct list_head ns_root_list; /* all root resources in ns */
- struct lustre_lock ns_lock; /* protects hash, refcount, list */
struct list_head ns_list_chain; /* position in global NS list */
/*
struct proc_dir_entry *ns_proc_dir;
struct list_head ns_unused_list; /* all root resources in ns */
int ns_nr_unused;
+ spinlock_t ns_unused_lock;
+
unsigned int ns_max_unused;
unsigned long ns_next_dump; /* next dump time */
- spinlock_t ns_counter_lock;
- __u64 ns_locks;
+ atomic_t ns_locks;
__u64 ns_resources;
ldlm_res_policy ns_policy;
struct ldlm_valblock_ops *ns_lvbo;
struct ldlm_lock {
struct portals_handle l_handle; // must be first in the structure
atomic_t l_refc;
+
+ /* ldlm_lock_change_resource() can change this */
struct ldlm_resource *l_resource;
+
+ /* set once, no need to protect it */
struct ldlm_lock *l_parent;
+
+ /* protected by ns_hash_lock */
struct list_head l_children;
struct list_head l_childof;
+
+ /* protected by ns_hash_lock. FIXME */
struct list_head l_lru;
+
+ /* protected by lr_lock */
struct list_head l_res_link; // position in one of three res lists
+
+ /* protected by led_lock */
struct list_head l_export_chain; // per-export chain of locks
+ /* protected by lr_lock */
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
struct obd_export *l_export;
struct obd_export *l_conn_export;
+
+ /* protected by lr_lock */
__u32 l_flags;
+
struct lustre_handle l_remote_handle;
ldlm_policy_data_t l_policy_data;
+ /* protected by lr_lock */
__u32 l_readers;
__u32 l_writers;
__u8 l_destroyed;
void *l_ast_data;
/* Server-side-only members */
+
+ /* protected by elt_lock */
struct list_head l_pending_chain; /* callbacks pending */
unsigned long l_callback_timeout;
__u32 l_pid; /* pid which created this lock */
struct list_head l_tmp;
+
+ /* for ldlm_add_ast_work_item() */
+ struct list_head l_bl_ast;
+ struct list_head l_cp_ast;
+ struct ldlm_lock *l_blocking_lock;
+ int l_bl_ast_run;
};
#define LDLM_PLAIN 10
struct ldlm_resource {
struct ldlm_namespace *lr_namespace;
+
+ /* protected by ns_hash_lock */
struct list_head lr_hash;
struct ldlm_resource *lr_parent; /* 0 for a root resource */
struct list_head lr_children; /* list head for child resources */
struct list_head lr_childof; /* part of ns_root_list if root res,
* part of lr_children if child */
+ spinlock_t lr_lock;
+ /* protected by lr_lock */
struct list_head lr_granted;
struct list_head lr_converting;
struct list_head lr_waiting;
ldlm_mode_t lr_most_restr;
__u32 lr_type; /* LDLM_PLAIN or LDLM_EXTENT */
- struct ldlm_resource *lr_root;
struct ldlm_res_id lr_name;
atomic_t lr_refcount;
CDEBUG(D_DLMTRACE, "### " format "\n" , ## a)
typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, int *flags,
- int first_enq, ldlm_error_t *err);
+ int first_enq, ldlm_error_t *err,
+ struct list_head *work_list);
/*
* Iterators.
#define IOC_LDLM_REGRESS_STOP _IOWR('f', 43, long)
#define IOC_LDLM_MAX_NR 43
+static inline void lock_res(struct ldlm_resource *res)
+{
+ spin_lock(&res->lr_lock);
+}
+
+static inline void unlock_res(struct ldlm_resource *res)
+{
+ spin_unlock(&res->lr_lock);
+}
+
+static inline void check_res_locked(struct ldlm_resource *res)
+{
+ LASSERT_SPIN_LOCKED(&res->lr_lock);
+}
+
+
#endif
struct ldlm_export_data {
struct list_head led_held_locks; /* protected by namespace lock */
+ spinlock_t led_lock;
};
struct ec_export_data { /* echo client */
void statfs_pack(struct obd_statfs *osfs, struct kstatfs *sfs);
void statfs_unpack(struct kstatfs *sfs, struct obd_statfs *osfs);
-/* l_lock.c */
-struct lustre_lock {
- int l_depth;
- struct task_struct *l_owner;
- struct semaphore l_sem;
- spinlock_t l_spin;
-};
-
-void l_lock_init(struct lustre_lock *);
-void l_lock(struct lustre_lock *);
-void l_unlock(struct lustre_lock *);
-int l_has_lock(struct lustre_lock *);
-
-
/*
* OBD IOCTLS
*/
#include <linux/lustre_dlm.h>
#include <linux/lustre_lib.h>
-/* invariants:
- - only the owner of the lock changes l_owner/l_depth
- - if a non-owner changes or checks the variables a spin lock is taken
-*/
-
-void l_lock_init(struct lustre_lock *lock)
-{
- sema_init(&lock->l_sem, 1);
- spin_lock_init(&lock->l_spin);
-}
-
-void l_lock(struct lustre_lock *lock)
-{
- int owner = 0;
-
- spin_lock(&lock->l_spin);
- if (lock->l_owner == current)
- owner = 1;
- spin_unlock(&lock->l_spin);
-
- /* This is safe to increment outside the spinlock because we
- * can only have 1 CPU running on the current task
- * (i.e. l_owner == current), regardless of the number of CPUs.
- */
- if (owner) {
- ++lock->l_depth;
- } else {
- down(&lock->l_sem);
- spin_lock(&lock->l_spin);
- lock->l_owner = current;
- lock->l_depth = 0;
- spin_unlock(&lock->l_spin);
- }
-}
-
-void l_unlock(struct lustre_lock *lock)
-{
- LASSERTF(lock->l_owner == current, "lock %p, current %p\n",
- lock->l_owner, current);
- LASSERTF(lock->l_depth >= 0, "depth %d\n", lock->l_depth);
- spin_lock(&lock->l_spin);
- if (--lock->l_depth < 0) {
- lock->l_owner = NULL;
- spin_unlock(&lock->l_spin);
- up(&lock->l_sem);
- return;
- }
- spin_unlock(&lock->l_spin);
-}
-
-int l_has_lock(struct lustre_lock *lock)
-{
- int depth = -1, owner = 0;
-
- spin_lock(&lock->l_spin);
- if (lock->l_owner == current) {
- depth = lock->l_depth;
- owner = 1;
- }
- spin_unlock(&lock->l_spin);
-
- if (depth >= 0)
- CDEBUG(D_INFO, "lock_depth: %d\n", depth);
- return owner;
-}
-
-#ifdef __KERNEL__
-#include <linux/lustre_version.h>
-void l_check_no_ns_lock(struct ldlm_namespace *ns)
-{
- static unsigned long next_msg;
-
- if (l_has_lock(&ns->ns_lock) && time_after(jiffies, next_msg)) {
- CERROR("namespace %s lock held illegally; tell phil\n",
- ns->ns_name);
- portals_debug_dumpstack(NULL);
- next_msg = jiffies + 60 * HZ;
- }
-}
-
-#else
-void l_check_no_ns_lock(struct ldlm_namespace *ns)
-{
- if (l_has_lock(&ns->ns_lock)) {
- CERROR("namespace %s lock held illegally; tell phil\n",
- ns->ns_name);
- }
-}
-#endif /* __KERNEL__ */
*/
static int
ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
- int send_cbs, int *flags, ldlm_error_t *err)
+ int *flags, ldlm_error_t *err,
+ struct list_head *work_list)
{
struct list_head *tmp;
struct ldlm_lock *lock;
continue;
}
- if (!send_cbs)
+ if (!work_list)
RETURN(0);
compat = 0;
if (lock->l_blocking_ast)
- ldlm_add_ast_work_item(lock, req, NULL, 0);
+ ldlm_add_ast_work_item(lock, req, work_list);
}
return(compat);
* - the caller has NOT initialized req->lr_tmp, so we must
* - must call this function with the ns lock held once */
int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
- ldlm_error_t *err)
+ ldlm_error_t *err, struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
* flags should always be zero here, and if that ever stops
* being true, we want to find out. */
LASSERT(*flags == 0);
- LASSERT(res->lr_tmp != NULL);
- rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0, flags,
- err);
+ rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
+ err, NULL);
if (rc == 1) {
- rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0,
- flags, err);
+ rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
+ flags, err, NULL);
}
if (rc == 0)
RETURN(LDLM_ITER_STOP);
ldlm_resource_unlink_lock(lock);
-
ldlm_extent_policy(res, lock, flags);
- ldlm_grant_lock(lock, NULL, 0, 1);
+ ldlm_grant_lock(lock, work_list);
RETURN(LDLM_ITER_CONTINUE);
}
restart:
- LASSERT(res->lr_tmp == NULL);
- res->lr_tmp = &rpc_list;
- rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1, flags, err);
+ rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
if (rc < 0)
GOTO(out, rc); /* lock was destroyed */
if (rc == 2) {
- res->lr_tmp = NULL;
goto grant;
}
- rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, 1, flags, err);
+ rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
if (rc2 < 0)
GOTO(out, rc = rc2); /* lock was destroyed */
- res->lr_tmp = NULL;
if (rc + rc2 == 2) {
grant:
ldlm_extent_policy(res, lock, flags);
ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL, 0, 0);
+ ldlm_grant_lock(lock, NULL);
} else {
/* If either of the compat_queue()s returned failure, then we
* have ASTs to send and must go onto the waiting list.
* re-ordered! Causes deadlock, because ASTs aren't sent! */
if (list_empty(&lock->l_res_link))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- l_unlock(&res->lr_namespace->ns_lock);
- rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
- l_lock(&res->lr_namespace->ns_lock);
+ unlock_res(res);
+ rc = ldlm_run_bl_ast_work(&rpc_list);
+ lock_res(res);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
*flags |= LDLM_FL_BLOCK_GRANTED;
}
rc = 0;
out:
- res->lr_tmp = NULL;
RETURN(rc);
}
int
ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
- ldlm_error_t *err)
+ ldlm_error_t *err, struct list_head *work_list)
{
struct ldlm_resource *res = req->l_resource;
struct ldlm_namespace *ns = res->lr_namespace;
&new2->l_export->exp_ldlm_data.led_held_locks);
}
if (*flags == LDLM_FL_WAIT_NOREPROC)
- ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+ ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
/* insert new2 at lock */
ldlm_resource_add_lock(res, ownlocks, new2);
= LIST_HEAD_INIT(rpc_list);
int rc;
restart:
- res->lr_tmp = &rpc_list;
- ldlm_reprocess_queue(res, &res->lr_waiting);
- res->lr_tmp = NULL;
-
- l_unlock(&ns->ns_lock);
- rc = ldlm_run_ast_work(res->lr_namespace,
- &rpc_list);
- l_lock(&ns->ns_lock);
+ ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
+ unlock_res(res);
+ rc = ldlm_run_cp_ast_work(&rpc_list);
+ lock_res(res);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
}
} else {
LASSERT(req->l_completion_ast);
- ldlm_add_ast_work_item(req, NULL, NULL, 0);
+ ldlm_add_ast_work_item(req, NULL, NULL);
}
}
LDLM_DEBUG(lock, "client-side enqueue granted");
ns = lock->l_resource->lr_namespace;
- l_lock(&ns->ns_lock);
+ lock_res(lock->l_resource);
/* take lock off the deadlock detection waitq. */
list_del_init(&lock->l_flock_waitq);
/* We need to reprocess the lock to do merges or splits
* with existing locks owned by this process. */
- ldlm_process_flock_lock(lock, &noreproc, 1, &err);
+ ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
if (flags == 0)
wake_up(&lock->l_waitq);
}
- l_unlock(&ns->ns_lock);
+ unlock_res(lock->l_resource);
RETURN(0);
}
int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
void *data, int flag)
{
- struct ldlm_namespace *ns;
ENTRY;
LASSERT(lock);
LASSERT(flag == LDLM_CB_CANCELING);
- ns = lock->l_resource->lr_namespace;
-
/* take lock off the deadlock detection waitq. */
- l_lock(&ns->ns_lock);
+ lock_res(lock->l_resource);
list_del_init(&lock->l_flock_waitq);
- l_unlock(&ns->ns_lock);
+ unlock_res(lock->l_resource);
RETURN(0);
}
/* Determine if the lock is compatible with all locks on the queue. */
static int
ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
- int send_cbs)
+ struct list_head *work_list)
{
struct list_head *tmp;
struct ldlm_lock *lock;
if (!(lock->l_policy_data.l_inodebits.bits & req_bits))
continue;
- if (!send_cbs)
+ if (!work_list)
RETURN(0);
compat = 0;
if (lock->l_blocking_ast)
- ldlm_add_ast_work_item(lock, req, NULL, 0);
+ ldlm_add_ast_work_item(lock, req, work_list);
}
RETURN(compat);
* - the caller has NOT initialized req->lr_tmp, so we must
* - must call this function with the ns lock held once */
int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
- int first_enq, ldlm_error_t *err)
+ int first_enq, ldlm_error_t *err,
+ struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
ENTRY;
LASSERT(list_empty(&res->lr_converting));
+ check_res_locked(res);
if (!first_enq) {
- LASSERT(res->lr_tmp != NULL);
- rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 0);
+ LASSERT(work_list != NULL);
+ rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL);
if (!rc)
RETURN(LDLM_ITER_STOP);
- rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 0);
+ rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL);
if (!rc)
RETURN(LDLM_ITER_STOP);
ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL, 0, 1);
+ ldlm_grant_lock(lock, work_list);
RETURN(LDLM_ITER_CONTINUE);
}
restart:
- LASSERT(res->lr_tmp == NULL);
- res->lr_tmp = &rpc_list;
- rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 1);
- rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 1);
- res->lr_tmp = NULL;
+ rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
+ rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list);
if (rc != 2) {
/* If either of the compat_queue()s returned 0, then we
* re-ordered! Causes deadlock, because ASTs aren't sent! */
if (list_empty(&lock->l_res_link))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- l_unlock(&res->lr_namespace->ns_lock);
- rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
- l_lock(&res->lr_namespace->ns_lock);
+ unlock_res(res);
+ rc = ldlm_run_bl_ast_work(&rpc_list);
+ lock_res(res);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
*flags |= LDLM_FL_BLOCK_GRANTED;
} else {
ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL, 0, 0);
+ ldlm_grant_lock(lock, NULL);
}
RETURN(0);
}
/* ldlm_resource.c */
void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
struct ldlm_lock *new);
+int ldlm_resource_putref_locked(struct ldlm_resource *res);
/* ldlm_lock.c */
-void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
- int run_ast);
+void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list);
+
struct ldlm_lock *
ldlm_lock_create(struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle, struct ldlm_res_id,
__u32 lvb_len);
ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
void *cookie, int *flags);
+void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, __u32 mode);
void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode);
void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, int datalen);
-int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue);
-int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list);
+ struct list_head *work_list);
+int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
+ struct list_head *work_list);
+int ldlm_run_bl_ast_work(struct list_head *rpc_list);
+int ldlm_run_cp_ast_work(struct list_head *rpc_list);
/* ldlm_lockd.c */
int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
/* ldlm_plain.c */
int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
- ldlm_error_t *err);
+ ldlm_error_t *err, struct list_head *work_list);
/* ldlm_extent.c */
int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
- ldlm_error_t *err);
+ ldlm_error_t *err, struct list_head *work_list);
/* ldlm_flock.c */
-int ldlm_process_flock_lock(struct ldlm_lock *lock, int *flags, int first_enq,
- ldlm_error_t *err);
+int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
+ ldlm_error_t *err, struct list_head *work_list);
/* ldlm_inodebits.c */
int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
- int first_enq, ldlm_error_t *err);
+ int first_enq, ldlm_error_t *err,
+ struct list_head *work);
/* l_lock.c */
void l_check_no_ns_lock(struct ldlm_namespace *ns);
}
extern kmem_cache_t *ldlm_lock_slab;
-struct lustre_lock ldlm_handle_lock;
static ldlm_processing_policy ldlm_processing_policy_table[] = {
[LDLM_PLAIN] ldlm_process_plain_lock,
{
ENTRY;
+ LASSERT(lock->l_resource != LP_POISON);
+ LASSERT(atomic_read(&lock->l_refc) > 0);
if (atomic_dec_and_test(&lock->l_refc)) {
- struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+ struct ldlm_resource *res = lock->l_resource;
+ struct ldlm_namespace *ns = res->lr_namespace;
- l_lock(&ns->ns_lock);
LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
+
+ LASSERT(lock->l_resource != LP_POISON);
+ lock_res(res);
LASSERT(lock->l_destroyed);
LASSERT(list_empty(&lock->l_res_link));
- spin_lock(&ns->ns_counter_lock);
- ns->ns_locks--;
- spin_unlock(&ns->ns_counter_lock);
+ if (lock->l_parent)
+ LDLM_LOCK_PUT(lock->l_parent);
+ unlock_res(res);
ldlm_resource_putref(lock->l_resource);
lock->l_resource = NULL;
if (lock->l_export)
class_export_put(lock->l_export);
-
- if (lock->l_parent)
- LDLM_LOCK_PUT(lock->l_parent);
+ atomic_dec(&ns->ns_locks);
if (lock->l_lvb_data != NULL)
OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
- l_unlock(&ns->ns_lock);
}
EXIT;
void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
{
ENTRY;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ spin_lock(&lock->l_resource->lr_namespace->ns_unused_lock);
if (!list_empty(&lock->l_lru)) {
LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
list_del_init(&lock->l_lru);
lock->l_resource->lr_namespace->ns_nr_unused--;
LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
}
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ spin_unlock(&lock->l_resource->lr_namespace->ns_unused_lock);
EXIT;
}
void ldlm_lock_destroy(struct ldlm_lock *lock)
{
ENTRY;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
+ lock_res(lock->l_resource);
if (!list_empty(&lock->l_children)) {
LDLM_ERROR(lock, "still has children (%p)!",
if (lock->l_destroyed) {
LASSERT(list_empty(&lock->l_lru));
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
EXIT;
return;
}
lock->l_destroyed = 1;
- list_del_init(&lock->l_export_chain);
+ if (lock->l_export) {
+ spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
+ if (!list_empty(&lock->l_export_chain))
+ list_del_init(&lock->l_export_chain);
+ spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
+ } else {
+ LASSERT(list_empty(&lock->l_export_chain));
+ }
+
ldlm_lock_remove_from_lru(lock);
class_handle_unhash(&lock->l_handle);
lock->l_completion_ast(lock, 0);
#endif
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
LDLM_LOCK_PUT(lock);
EXIT;
}
INIT_LIST_HEAD(&lock->l_export_chain);
INIT_LIST_HEAD(&lock->l_pending_chain);
INIT_LIST_HEAD(&lock->l_tmp);
+ INIT_LIST_HEAD(&lock->l_bl_ast);
+ INIT_LIST_HEAD(&lock->l_cp_ast);
init_waitqueue_head(&lock->l_waitq);
+ lock->l_blocking_lock = NULL;
- spin_lock(&resource->lr_namespace->ns_counter_lock);
- resource->lr_namespace->ns_locks++;
- spin_unlock(&resource->lr_namespace->ns_counter_lock);
+ atomic_inc(&resource->lr_namespace->ns_locks);
if (parent != NULL) {
- l_lock(&parent->l_resource->lr_namespace->ns_lock);
+ spin_lock(&resource->lr_namespace->ns_hash_lock);
lock->l_parent = LDLM_LOCK_GET(parent);
list_add(&lock->l_childof, &parent->l_children);
- l_unlock(&parent->l_resource->lr_namespace->ns_lock);
+ spin_unlock(&resource->lr_namespace->ns_hash_lock);
}
INIT_LIST_HEAD(&lock->l_handle.h_link);
struct ldlm_resource *oldres = lock->l_resource;
ENTRY;
- l_lock(&ns->ns_lock);
+ lock_res(oldres);
if (memcmp(&new_resid, &lock->l_resource->lr_name,
sizeof(lock->l_resource->lr_name)) == 0) {
/* Nothing to do */
- l_unlock(&ns->ns_lock);
+ unlock_res(oldres);
RETURN(0);
}
RETURN(-ENOMEM);
}
+ unlock_res(oldres);
+
/* ...and the flowers are still standing! */
ldlm_resource_putref(oldres);
- l_unlock(&ns->ns_lock);
RETURN(0);
}
ns = lock->l_resource->lr_namespace;
LASSERT(ns != NULL);
- l_lock(&ns->ns_lock);
+ lock_res(lock->l_resource);
/* It's unlikely but possible that someone marked the lock as
* destroyed after we did handle2object on it */
if (lock->l_destroyed) {
+ unlock_res(lock->l_resource);
CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
LDLM_LOCK_PUT(lock);
GOTO(out, retval);
}
if (flags && (lock->l_flags & flags)) {
+ unlock_res(lock->l_resource);
LDLM_LOCK_PUT(lock);
GOTO(out, retval);
}
if (flags)
lock->l_flags |= flags;
+ unlock_res(lock->l_resource);
retval = lock;
EXIT;
out:
- l_unlock(&ns->ns_lock);
return retval;
}
struct lustre_handle *handle)
{
struct ldlm_lock *retval = NULL;
-
- l_lock(&ns->ns_lock);
retval = __ldlm_handle2lock(handle, 0);
- l_unlock(&ns->ns_lock);
-
return retval;
}
sizeof(desc->l_policy_data));
}
-void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, int datalen)
+void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
+ struct list_head *work_list)
{
- struct ldlm_ast_work *w;
- ENTRY;
-
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
- if (new && (lock->l_flags & LDLM_FL_AST_SENT))
- GOTO(out, 0);
-
- CDEBUG(D_OTHER, "lock %p incompatible; sending blocking AST.\n", lock);
-
- OBD_ALLOC(w, sizeof(*w));
- if (!w) {
- LBUG();
- GOTO(out, 0);
- }
-
- w->w_data = data;
- w->w_datalen = datalen;
- if (new) {
+ if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
lock->l_flags |= LDLM_FL_AST_SENT;
/* If the enqueuing client said so, tell the AST recipient to
* discard dirty data, rather than writing back. */
if (new->l_flags & LDLM_AST_DISCARD_DATA)
lock->l_flags |= LDLM_FL_DISCARD_DATA;
- w->w_blocking = 1;
- ldlm_lock2desc(new, &w->w_desc);
+ LASSERT(list_empty(&lock->l_bl_ast));
+ list_add(&lock->l_bl_ast, work_list);
+ LDLM_LOCK_GET(lock);
+ LASSERT(lock->l_blocking_lock == NULL);
+ lock->l_blocking_lock = LDLM_LOCK_GET(new);
}
+}
+
+void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
+{
+ if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
+ LDLM_DEBUG(lock, "lock granted; sending completion AST.");
+ lock->l_flags |= LDLM_FL_CP_REQD;
+ LASSERT(list_empty(&lock->l_cp_ast));
+ list_add(&lock->l_cp_ast, work_list);
+ LDLM_LOCK_GET(lock);
+ }
+}
- w->w_lock = LDLM_LOCK_GET(lock);
- list_add(&w->w_list, lock->l_resource->lr_tmp);
+/* must be called with lr_lock held */
+void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
+ struct list_head *work_list)
+{
+ ENTRY;
+ check_res_locked(lock->l_resource);
+ if (new)
+ ldlm_add_bl_work_item(lock, new, work_list);
+ else
+ ldlm_add_cp_work_item(lock, work_list);
EXIT;
- out:
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
}
void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
LDLM_LOCK_PUT(lock);
}
-/* only called for local locks */
-void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
+void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
{
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_lock_remove_from_lru(lock);
if (mode & (LCK_NL | LCK_CR | LCK_PR))
lock->l_readers++;
lock->l_last_used = jiffies;
LDLM_LOCK_GET(lock);
LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+}
+
+/* only called for local locks */
+void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
+{
+ lock_res(lock->l_resource);
+ ldlm_lock_addref_internal_nolock(lock, mode);
+ unlock_res(lock->l_resource);
}
void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
ns = lock->l_resource->lr_namespace;
- l_lock(&ns->ns_lock);
+ lock_res(lock->l_resource);
+
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
LASSERT(lock->l_readers > 0);
LDLM_LOCK_GET(lock); /* dropped by bl thread */
ldlm_lock_remove_from_lru(lock);
- l_unlock(&ns->ns_lock);
- if (ldlm_bl_to_thread(ns, NULL, lock) != 0)
+ unlock_res(lock->l_resource);
+ if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
+ ldlm_bl_to_thread(ns, NULL, lock) != 0)
ldlm_handle_bl_callback(ns, NULL, lock);
} else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
!lock->l_readers && !lock->l_writers) {
* reference, put it on the LRU. */
LASSERT(list_empty(&lock->l_lru));
LASSERT(ns->ns_nr_unused >= 0);
+ spin_lock(&ns->ns_unused_lock);
list_add_tail(&lock->l_lru, &ns->ns_unused_list);
ns->ns_nr_unused++;
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_unused_lock);
+ unlock_res(lock->l_resource);
ldlm_cancel_lru(ns, LDLM_ASYNC);
} else {
- l_unlock(&ns->ns_lock);
+ unlock_res(lock->l_resource);
}
LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
LASSERT(lock != NULL);
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
lock->l_flags |= LDLM_FL_CBPENDING;
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
ldlm_lock_decref_internal(lock, mode);
LDLM_LOCK_PUT(lock);
}
* - ldlm_lock_enqueue
* - ldlm_reprocess_queue
* - ldlm_lock_convert
+ *
+ * must be called with lr_lock held
*/
-void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
- int run_ast)
+void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
ENTRY;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ check_res_locked(res);
+
lock->l_granted_mode = lock->l_req_mode;
ldlm_resource_add_lock(res, &res->lr_granted, lock);
if (lock->l_granted_mode < res->lr_most_restr)
res->lr_most_restr = lock->l_granted_mode;
- if (run_ast && lock->l_completion_ast != NULL)
- ldlm_add_ast_work_item(lock, NULL, data, datalen);
+ if (work_list && lock->l_completion_ast != NULL)
+ ldlm_add_ast_work_item(lock, NULL, work_list);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
EXIT;
}
if (flags & LDLM_FL_TEST_LOCK)
LDLM_LOCK_GET(lock);
else
- ldlm_lock_addref_internal(lock, mode);
+ ldlm_lock_addref_internal_nolock(lock, mode);
return lock;
}
void ldlm_lock_allow_match(struct ldlm_lock *lock)
{
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
lock->l_flags |= LDLM_FL_CAN_MATCH;
wake_up(&lock->l_waitq);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
}
/* Can be called in two ways:
RETURN(0);
}
- l_lock(&ns->ns_lock);
+ lock_res(res);
lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
if (lock != NULL)
EXIT;
out:
+ unlock_res(res);
ldlm_resource_putref(res);
- l_unlock(&ns->ns_lock);
if (lock) {
ldlm_lock2handle(lock, lockh);
out2:
if (rc) {
- l_lock(&ns->ns_lock);
LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
type == LDLM_PLAIN ? res_id->name[2] :
policy->l_extent.start,
type == LDLM_PLAIN ? res_id->name[3] :
policy->l_extent.end);
- l_unlock(&ns->ns_lock);
} else if (!(flags & LDLM_FL_TEST_LOCK)) {/* less verbose for test-only */
LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
LASSERT(rc == ELDLM_OK);
}
- l_lock(&ns->ns_lock);
+ lock_res(lock->l_resource);
if (local && lock->l_req_mode == lock->l_granted_mode) {
/* The server returned a blocked lock, but it was granted before
* we got a chance to actually enqueue it. We don't need to do
else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
else
- ldlm_grant_lock(lock, NULL, 0, 0);
+ ldlm_grant_lock(lock, NULL);
GOTO(out, ELDLM_OK);
} else if (*flags & LDLM_FL_REPLAY) {
if (*flags & LDLM_FL_BLOCK_CONV) {
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
GOTO(out, ELDLM_OK);
} else if (*flags & LDLM_FL_BLOCK_GRANTED) {
- ldlm_grant_lock(lock, NULL, 0, 0);
+ ldlm_grant_lock(lock, NULL);
GOTO(out, ELDLM_OK);
}
/* If no flags, fall through to normal enqueue path. */
}
policy = ldlm_processing_policy_table[res->lr_type];
- policy(lock, flags, 1, &rc);
+ policy(lock, flags, 1, &rc, NULL);
EXIT;
out:
- l_unlock(&ns->ns_lock);
+ unlock_res(lock->l_resource);
return rc;
}
/* Must be called with namespace taken: queue is waiting or converting. */
-int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue)
+int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
+ struct list_head *work_list)
{
struct list_head *tmp, *pos;
ldlm_processing_policy policy;
ldlm_error_t err;
ENTRY;
+ check_res_locked(res);
+
policy = ldlm_processing_policy_table[res->lr_type];
LASSERT(policy);
CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
flags = 0;
- rc = policy(pending, &flags, 0, &err);
+ rc = policy(pending, &flags, 0, &err, work_list);
if (rc != LDLM_ITER_CONTINUE)
break;
}
RETURN(rc);
}
-int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list)
+int ldlm_run_bl_ast_work(struct list_head *rpc_list)
{
struct list_head *tmp, *pos;
- int rc, retval = 0;
+ struct ldlm_lock_desc d;
+ int rc = 0, retval = 0;
ENTRY;
- l_check_no_ns_lock(ns);
+ list_for_each_safe(tmp, pos, rpc_list) {
+ struct ldlm_lock *lock =
+ list_entry(tmp, struct ldlm_lock, l_bl_ast);
+
+ /* nobody should touch l_bl_ast */
+ lock_res(lock->l_resource);
+ list_del_init(&lock->l_bl_ast);
+
+ LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+ LASSERT(lock->l_bl_ast_run == 0);
+ LASSERT(lock->l_blocking_lock);
+ lock->l_bl_ast_run++;
+ unlock_res(lock->l_resource);
+
+ ldlm_lock2desc(lock->l_blocking_lock, &d);
+
+ LDLM_LOCK_PUT(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
+ rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING);
+
+ if (rc == -ERESTART)
+ retval = rc;
+ else if (rc)
+ CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
+ "disconnect client\n");
+ LDLM_LOCK_PUT(lock);
+ }
+ RETURN(retval);
+}
+
+int ldlm_run_cp_ast_work(struct list_head *rpc_list)
+{
+ struct list_head *tmp, *pos;
+ int rc = 0, retval = 0;
+ ENTRY;
+
+ /* It's possible to receive a completion AST before we've set
+ * the l_completion_ast pointer: either because the AST arrived
+ * before the reply, or simply because there's a small race
+ * window between receiving the reply and finishing the local
+ * enqueue. (bug 842)
+ *
+ * This can't happen with the blocking_ast, however, because we
+ * will never call the local blocking_ast until we drop our
+ * reader/writer reference, which we won't do until we get the
+ * reply and finish enqueueing. */
list_for_each_safe(tmp, pos, rpc_list) {
- struct ldlm_ast_work *w =
- list_entry(tmp, struct ldlm_ast_work, w_list);
-
- /* It's possible to receive a completion AST before we've set
- * the l_completion_ast pointer: either because the AST arrived
- * before the reply, or simply because there's a small race
- * window between receiving the reply and finishing the local
- * enqueue. (bug 842)
- *
- * This can't happen with the blocking_ast, however, because we
- * will never call the local blocking_ast until we drop our
- * reader/writer reference, which we won't do until we get the
- * reply and finish enqueueing. */
- LASSERT(w->w_lock != NULL);
- if (w->w_blocking) {
- LASSERT(w->w_lock->l_blocking_ast != NULL);
- rc = w->w_lock->l_blocking_ast
- (w->w_lock, &w->w_desc, w->w_data,
- LDLM_CB_BLOCKING);
- } else if (w->w_lock->l_completion_ast != NULL) {
- LASSERT(w->w_lock->l_completion_ast != NULL);
- rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags,
- w->w_data);
- } else {
- rc = 0;
- }
+ struct ldlm_lock *lock =
+ list_entry(tmp, struct ldlm_lock, l_cp_ast);
+
+ /* nobody should touch l_cp_ast */
+ lock_res(lock->l_resource);
+ list_del_init(&lock->l_cp_ast);
+ LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
+ lock->l_flags &= ~LDLM_FL_CP_REQD;
+ unlock_res(lock->l_resource);
+
+ if (lock->l_completion_ast != NULL)
+ rc = lock->l_completion_ast(lock, 0, 0);
+
if (rc == -ERESTART)
retval = rc;
else if (rc)
CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
"disconnect client\n");
- LDLM_LOCK_PUT(w->w_lock);
- list_del(&w->w_list);
- OBD_FREE(w, sizeof(*w));
+ LDLM_LOCK_PUT(lock);
}
RETURN(retval);
}
void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
{
+ struct list_head *tmp;
int i, rc;
- l_lock(&ns->ns_lock);
+ spin_lock(&ns->ns_hash_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
- struct list_head *tmp, *next;
- list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
+ tmp = ns->ns_hash[i].next;
+ while (tmp != &(ns->ns_hash[i])) {
struct ldlm_resource *res =
list_entry(tmp, struct ldlm_resource, lr_hash);
ldlm_resource_getref(res);
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_hash_lock);
+
rc = reprocess_one_queue(res, NULL);
- l_lock(&ns->ns_lock);
- next = tmp->next;
- ldlm_resource_putref(res);
+
+ spin_lock(&ns->ns_hash_lock);
+ tmp = tmp->next;
+ ldlm_resource_putref_locked(res);
+
if (rc == LDLM_ITER_STOP)
GOTO(out, rc);
}
}
out:
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_hash_lock);
EXIT;
}
}
restart:
- l_lock(&res->lr_namespace->ns_lock);
- res->lr_tmp = &rpc_list;
-
- rc = ldlm_reprocess_queue(res, &res->lr_converting);
+ lock_res(res);
+ rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
if (rc == LDLM_ITER_CONTINUE)
- ldlm_reprocess_queue(res, &res->lr_waiting);
-
- res->lr_tmp = NULL;
- l_unlock(&res->lr_namespace->ns_lock);
+ ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
+ unlock_res(res);
- rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
+ rc = ldlm_run_cp_ast_work(&rpc_list);
if (rc == -ERESTART) {
LASSERT(list_empty(&rpc_list));
goto restart;
void ldlm_cancel_callback(struct ldlm_lock *lock)
{
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ check_res_locked(lock->l_resource);
+
if (!(lock->l_flags & LDLM_FL_CANCEL)) {
lock->l_flags |= LDLM_FL_CANCEL;
if (lock->l_blocking_ast) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- // l_check_no_ns_lock(lock->l_resource->lr_namespace);
+ unlock_res(lock->l_resource);
lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
LDLM_CB_CANCELING);
- return;
+ lock_res(lock->l_resource);
} else {
LDLM_DEBUG(lock, "no blocking ast");
}
}
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
}
void ldlm_lock_cancel(struct ldlm_lock *lock)
res = lock->l_resource;
ns = res->lr_namespace;
- l_lock(&ns->ns_lock);
ldlm_del_waiting_lock(lock);
-
+ lock_res(res);
+
/* Please do not, no matter how tempting, remove this LBUG without
* talking to me first. -phik */
if (lock->l_readers || lock->l_writers) {
ldlm_cancel_callback(lock);
ldlm_resource_unlink_lock(lock);
+ unlock_res(res);
+
ldlm_lock_destroy(lock);
- l_unlock(&ns->ns_lock);
+
EXIT;
}
void ldlm_cancel_locks_for_export(struct obd_export *exp)
{
- struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
struct ldlm_lock *lock;
struct ldlm_resource *res;
- l_lock(&ns->ns_lock);
+ spin_lock(&exp->exp_ldlm_data.led_lock);
while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
struct ldlm_lock, l_export_chain);
res = ldlm_resource_getref(lock->l_resource);
+ LDLM_LOCK_GET(lock);
+ spin_unlock(&exp->exp_ldlm_data.led_lock);
+
LDLM_DEBUG(lock, "export %p", exp);
ldlm_lock_cancel(lock);
- l_unlock(&ns->ns_lock);
ldlm_reprocess_all(res);
+
ldlm_resource_putref(res);
- l_lock(&ns->ns_lock);
+ LDLM_LOCK_PUT(lock);
+ spin_lock(&exp->exp_ldlm_data.led_lock);
}
- l_unlock(&ns->ns_lock);
+ spin_unlock(&exp->exp_ldlm_data.led_lock);
}
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
res = lock->l_resource;
ns = res->lr_namespace;
- l_lock(&ns->ns_lock);
+ lock_res(res);
old_mode = lock->l_req_mode;
lock->l_req_mode = new_mode;
*flags);
LBUG();
- res->lr_tmp = &rpc_list;
- ldlm_grant_lock(lock, NULL, 0, 0);
- res->lr_tmp = NULL;
+ ldlm_grant_lock(lock, &rpc_list);
granted = 1;
/* FIXME: completion handling not with ns_lock held ! */
if (lock->l_completion_ast)
int pflags = 0;
ldlm_processing_policy policy;
policy = ldlm_processing_policy_table[res->lr_type];
- res->lr_tmp = &rpc_list;
- rc = policy(lock, &pflags, 0, &err);
- res->lr_tmp = NULL;
+ rc = policy(lock, &pflags, 0, &err, &rpc_list);
if (rc == LDLM_ITER_STOP) {
lock->l_req_mode = old_mode;
ldlm_resource_add_lock(res, &res->lr_granted, lock);
granted = 1;
}
}
-
- l_unlock(&ns->ns_lock);
+ unlock_res(lock->l_resource);
if (granted)
- ldlm_run_ast_work(ns, &rpc_list);
+ ldlm_run_cp_ast_work(&rpc_list);
RETURN(res);
}
extern kmem_cache_t *ldlm_resource_slab;
extern kmem_cache_t *ldlm_lock_slab;
-extern struct lustre_lock ldlm_handle_lock;
extern struct list_head ldlm_namespace_list;
static DECLARE_MUTEX(ldlm_ref_sem);
ldlm_lock_cancel(lock);
rc = -ERESTART;
} else {
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_del_waiting_lock(lock);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_failed_ast(lock, rc, ast_type);
}
} else if (rc) {
LASSERT(lock);
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
- if (lock->l_granted_mode != lock->l_req_mode) {
- /* this blocking AST will be communicated as part of the
- * completion AST instead */
- LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- RETURN(0);
- }
-
- if (lock->l_destroyed) {
- /* What's the point? */
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- RETURN(0);
- }
-
#if 0
if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK,
1, &size, NULL);
- if (req == NULL) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ if (req == NULL)
RETURN(-ENOMEM);
+
+ lock_res(lock->l_resource);
+ if (lock->l_granted_mode != lock->l_req_mode) {
+ /* this blocking AST will be communicated as part of the
+ * completion AST instead */
+ unlock_res(lock->l_resource);
+ LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
+ ptlrpc_req_finished(req);
+ RETURN(0);
+ }
+
+ if (lock->l_destroyed) {
+ /* What's the point? */
+ unlock_res(lock->l_resource);
+ ptlrpc_req_finished(req);
+ RETURN(0);
}
body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
if (lock->l_granted_mode == lock->l_req_mode)
ldlm_add_waiting_lock(lock);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
req->rq_send_state = LUSTRE_IMP_FULL;
req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
if (total_enqueue_wait / 1000000 > obd_timeout)
LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
- down(&lock->l_resource->lr_lvb_sem);
+ lock_res(lock->l_resource);
if (lock->l_resource->lr_lvb_len) {
buffers = 2;
size[1] = lock->l_resource->lr_lvb_len;
}
- up(&lock->l_resource->lr_lvb_sem);
+ unlock_res(lock->l_resource);
req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK,
if (buffers == 2) {
void *lvb;
- down(&lock->l_resource->lr_lvb_sem);
lvb = lustre_msg_buf(req->rq_reqmsg, 1,
lock->l_resource->lr_lvb_len);
-
+ lock_res(lock->l_resource);
memcpy(lvb, lock->l_resource->lr_lvb_data,
lock->l_resource->lr_lvb_len);
- up(&lock->l_resource->lr_lvb_sem);
+ unlock_res(lock->l_resource);
}
LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
/* We only send real blocking ASTs after the lock is granted */
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
if (lock->l_flags & LDLM_FL_AST_SENT) {
body->lock_flags |= LDLM_FL_AST_SENT;
ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
}
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
rc = ptlrpc_queue_wait(req);
if (rc != 0)
sizeof(body->lock_handle1));
ldlm_lock2desc(lock, &body->lock_desc);
- down(&lock->l_resource->lr_lvb_sem);
+ lock_res(lock->l_resource);
size = lock->l_resource->lr_lvb_len;
- up(&lock->l_resource->lr_lvb_sem);
+ unlock_res(lock->l_resource);
req->rq_replen = lustre_msg_size(1, &size);
req->rq_send_state = LUSTRE_IMP_FULL;
static struct ldlm_lock *
find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl)
{
- struct obd_device *obd = exp->exp_obd;
struct list_head *iter;
- l_lock(&obd->obd_namespace->ns_lock);
+ spin_lock(&exp->exp_ldlm_data.led_lock);
list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
struct ldlm_lock *lock;
lock = list_entry(iter, struct ldlm_lock, l_export_chain);
if (lock->l_remote_handle.cookie == remote_hdl->cookie) {
LDLM_LOCK_GET(lock);
- l_unlock(&obd->obd_namespace->ns_lock);
+ spin_unlock(&exp->exp_ldlm_data.led_lock);
return lock;
}
}
- l_unlock(&obd->obd_namespace->ns_lock);
+ spin_unlock(&exp->exp_ldlm_data.led_lock);
return NULL;
}
LASSERT(req->rq_export);
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (req->rq_export->exp_failed) {
LDLM_ERROR(lock,"lock on destroyed export %p\n",req->rq_export);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
GOTO(out, err = -ENOTCONN);
}
- lock->l_export = class_export_get(req->rq_export);
+ lock->l_export = class_export_get(req->rq_export);
+ spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
list_add(&lock->l_export_chain,
&lock->l_export->exp_ldlm_data.led_held_locks);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
existing_lock:
cookie = req;
} else {
int buffers = 1;
- down(&lock->l_resource->lr_lvb_sem);
+ lock_res(lock->l_resource);
if (lock->l_resource->lr_lvb_len) {
size[1] = lock->l_resource->lr_lvb_len;
buffers = 2;
}
- up(&lock->l_resource->lr_lvb_sem);
+ unlock_res(lock->l_resource);
if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
GOTO(out, rc = -ENOMEM);
/* We never send a blocking AST until the lock is granted, but
* we can tell it right now */
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
if (lock->l_flags & LDLM_FL_AST_SENT) {
dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
if (lock->l_granted_mode == lock->l_req_mode)
ldlm_add_waiting_lock(lock);
}
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
EXIT;
out:
/* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
* ldlm_reprocess_all. If this moves, revisit that code. -phil */
if (lock) {
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
"(err=%d, rc=%d)", err, rc);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
if (rc == 0) {
- down(&lock->l_resource->lr_lvb_sem);
+ lock_res(lock->l_resource);
size[1] = lock->l_resource->lr_lvb_len;
if (size[1] > 0) {
void *lvb = lustre_msg_buf(req->rq_repmsg,
memcpy(lvb, lock->l_resource->lr_lvb_data,
size[1]);
}
- up(&lock->l_resource->lr_lvb_sem);
+ unlock_res(lock->l_resource);
} else {
ldlm_lock_destroy(lock);
}
} else {
void *res = NULL;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_DEBUG(lock, "server-side convert handler START");
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
do_gettimeofday(&lock->l_enqueued_time);
res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
(int *)&dlm_rep->lock_flags);
if (res) {
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (ldlm_del_waiting_lock(lock))
- CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ LDLM_DEBUG(lock, "converted waiting lock");
req->rq_status = 0;
} else {
req->rq_status = EDEADLOCK;
if (lock) {
if (!req->rq_status)
ldlm_reprocess_all(lock->l_resource);
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_DEBUG(lock, "server-side convert handler END");
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_LOCK_PUT(lock);
} else
LDLM_DEBUG_NOLOCK("server-side convert handler END");
//(res, req->rq_reqmsg, 1);
}
- l_lock(&res->lr_namespace->ns_lock);
ldlm_lock_cancel(lock);
if (ldlm_del_waiting_lock(lock))
CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
- l_unlock(&res->lr_namespace->ns_lock);
req->rq_status = rc;
}
if (lock) {
ldlm_reprocess_all(lock->l_resource);
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_DEBUG(lock, "server-side cancel handler END");
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_LOCK_PUT(lock);
}
int do_ast;
ENTRY;
- l_lock(&ns->ns_lock);
LDLM_DEBUG(lock, "client blocking AST callback handler START");
-
+
+ lock_res(lock->l_resource);
lock->l_flags |= LDLM_FL_CBPENDING;
do_ast = (!lock->l_readers && !lock->l_writers);
+ unlock_res(lock->l_resource);
if (do_ast) {
LDLM_DEBUG(lock, "already unused, calling "
"callback (%p)", lock->l_blocking_ast);
- if (lock->l_blocking_ast != NULL) {
- l_unlock(&ns->ns_lock);
- l_check_no_ns_lock(ns);
+ if (lock->l_blocking_ast != NULL)
lock->l_blocking_ast(lock, ld, lock->l_ast_data,
LDLM_CB_BLOCKING);
- l_lock(&ns->ns_lock);
- }
} else {
LDLM_DEBUG(lock, "Lock still has references, will be"
" cancelled later");
}
LDLM_DEBUG(lock, "client blocking callback handler END");
- l_unlock(&ns->ns_lock);
LDLM_LOCK_PUT(lock);
EXIT;
}
struct ldlm_request *dlm_req,
struct ldlm_lock *lock)
{
+ struct ldlm_resource *res = lock->l_resource;
LIST_HEAD(ast_list);
ENTRY;
- l_lock(&ns->ns_lock);
LDLM_DEBUG(lock, "client completion callback handler START");
+ lock_res(res);
+
/* If we receive the completion AST before the actual enqueue returned,
* then we might need to switch lock modes, resources, or extents. */
if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
&lock->l_resource->lr_name,
sizeof(lock->l_resource->lr_name)) != 0) {
+ unlock_res(res);
ldlm_lock_change_resource(ns, lock,
dlm_req->lock_desc.l_resource.lr_name);
LDLM_DEBUG(lock, "completion AST, new resource");
+ lock_res(res);
}
if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
}
}
- lock->l_resource->lr_tmp = &ast_list;
- ldlm_grant_lock(lock, req, sizeof(*req), 1);
- lock->l_resource->lr_tmp = NULL;
+ ldlm_grant_lock(lock, &ast_list);
+ unlock_res(res);
+
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
- l_unlock(&ns->ns_lock);
LDLM_LOCK_PUT(lock);
- ldlm_run_ast_work(ns, &ast_list);
+ ldlm_run_cp_ast_work(&ast_list);
LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
lock);
int rc = -ENOSYS;
ENTRY;
- l_lock(&ns->ns_lock);
LDLM_DEBUG(lock, "client glimpse AST callback handler");
- if (lock->l_glimpse_ast != NULL) {
- l_unlock(&ns->ns_lock);
- l_check_no_ns_lock(ns);
+ if (lock->l_glimpse_ast != NULL)
rc = lock->l_glimpse_ast(lock, req);
- l_lock(&ns->ns_lock);
- }
if (req->rq_repmsg != NULL) {
ptlrpc_reply(req);
ptlrpc_error(req);
}
- l_unlock(&ns->ns_lock);
+ lock_res(lock->l_resource);
if (lock->l_granted_mode == LCK_PW &&
!lock->l_readers && !lock->l_writers &&
time_after(jiffies, lock->l_last_used + 10 * HZ)) {
+ unlock_res(lock->l_resource);
if (ldlm_bl_to_thread(ns, NULL, lock))
ldlm_handle_bl_callback(ns, NULL, lock);
EXIT;
return;
}
+ unlock_res(lock->l_resource);
LDLM_LOCK_PUT(lock);
EXIT;
}
return -ENOMEM;
}
- l_lock_init(&ldlm_handle_lock);
-
return 0;
}
EXPORT_SYMBOL(ldlm_resource_get);
EXPORT_SYMBOL(ldlm_resource_putref);
-/* l_lock.c */
-EXPORT_SYMBOL(l_lock);
-EXPORT_SYMBOL(l_unlock);
-
/* ldlm_lib.c */
EXPORT_SYMBOL(client_import_add_conn);
EXPORT_SYMBOL(client_import_del_conn);
static inline int
ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req,
- int send_cbs)
+ struct list_head *work_list)
{
struct list_head *tmp;
struct ldlm_lock *lock;
if (lockmode_compat(lock->l_req_mode, req_mode))
continue;
- if (!send_cbs)
+ if (!work_list)
RETURN(0);
compat = 0;
if (lock->l_blocking_ast)
- ldlm_add_ast_work_item(lock, req, NULL, 0);
+ ldlm_add_ast_work_item(lock, req, work_list);
}
RETURN(compat);
* - the caller has NOT initialized req->lr_tmp, so we must
* - must call this function with the ns lock held once */
int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
- ldlm_error_t *err)
+ ldlm_error_t *err, struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
LASSERT(list_empty(&res->lr_converting));
if (!first_enq) {
- LASSERT(res->lr_tmp != NULL);
- rc = ldlm_plain_compat_queue(&res->lr_granted, lock, 0);
+ LASSERT(work_list != NULL);
+ rc = ldlm_plain_compat_queue(&res->lr_granted, lock, NULL);
if (!rc)
RETURN(LDLM_ITER_STOP);
- rc = ldlm_plain_compat_queue(&res->lr_waiting, lock, 0);
+ rc = ldlm_plain_compat_queue(&res->lr_waiting, lock, NULL);
if (!rc)
RETURN(LDLM_ITER_STOP);
ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL, 0, 1);
+ ldlm_grant_lock(lock, work_list);
RETURN(LDLM_ITER_CONTINUE);
}
restart:
- LASSERT(res->lr_tmp == NULL);
- res->lr_tmp = &rpc_list;
- rc = ldlm_plain_compat_queue(&res->lr_granted, lock, 1);
- rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, 1);
- res->lr_tmp = NULL;
+ rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list);
+ rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list);
if (rc != 2) {
/* If either of the compat_queue()s returned 0, then we
* re-ordered! Causes deadlock, because ASTs aren't sent! */
if (list_empty(&lock->l_res_link))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- l_unlock(&res->lr_namespace->ns_lock);
- rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
- l_lock(&res->lr_namespace->ns_lock);
+ unlock_res(res);
+ rc = ldlm_run_bl_ast_work(&rpc_list);
+ lock_res(res);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
*flags |= LDLM_FL_BLOCK_GRANTED;
} else {
ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL, 0, 0);
+ ldlm_grant_lock(lock, NULL);
}
RETURN(0);
}
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
lock->l_flags |= LDLM_FL_LOCAL;
+ if (*flags & LDLM_FL_ATOMIC_CB)
+ lock->l_flags |= LDLM_FL_ATOMIC_CB;
lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL)
memcpy(&lock->l_policy_data, policy, sizeof(*policy));
struct lustre_handle *lockh, int mode)
{
/* Set a flag to prevent us from sending a CANCEL (bug 407) */
- l_lock(&ns->ns_lock);
+ lock_res(lock->l_resource);
lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+ unlock_res(lock->l_resource);
LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
- l_unlock(&ns->ns_lock);
ldlm_lock_decref_and_cancel(lockh, mode);
}
if ((*flags) & LDLM_FL_AST_SENT) {
- l_lock(&ns->ns_lock);
+ lock_res(lock->l_resource);
lock->l_flags |= LDLM_FL_CBPENDING;
- l_unlock(&ns->ns_lock);
+ unlock_res(lock->l_resource);
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
}
LDLM_DEBUG(lock, "client-side cancel");
/* Set this flag to prevent others from getting new references*/
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
lock->l_flags |= LDLM_FL_CBPENDING;
local_only = lock->l_flags & LDLM_FL_LOCAL_ONLY;
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_cancel_callback(lock);
+ unlock_res(lock->l_resource);
if (local_only) {
CDEBUG(D_INFO, "not sending request (at caller's "
sync = LDLM_SYNC; /* force to be sync in user space */
#endif
- l_lock(&ns->ns_lock);
+ spin_lock(&ns->ns_unused_lock);
count = ns->ns_nr_unused - ns->ns_max_unused;
if (count <= 0) {
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_unused_lock);
RETURN(0);
}
- list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru) {
+ while (!list_empty(&ns->ns_unused_list)) {
+ struct list_head *tmp = ns->ns_unused_list.next;
+ lock = list_entry(tmp, struct ldlm_lock, l_lru);
LASSERT(!lock->l_readers && !lock->l_writers);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
+ spin_unlock(&ns->ns_unused_lock);
+
+ lock_res(lock->l_resource);
+ ldlm_lock_remove_from_lru(lock);
+
/* Setting the CBPENDING flag is a little misleading, but
* prevents an important race; namely, once CBPENDING is set,
* the lock can accumulate no more readers/writers. Since
* won't see this flag and call l_blocking_ast */
lock->l_flags |= LDLM_FL_CBPENDING;
- LDLM_LOCK_GET(lock); /* dropped by bl thread */
- ldlm_lock_remove_from_lru(lock);
-
/* We can't re-add to l_lru as it confuses the refcounting in
* ldlm_lock_remove_from_lru() if an AST arrives after we drop
* ns_lock below. We use l_tmp and can't use l_pending_chain as
if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock))
list_add(&lock->l_tmp, &cblist);
+ unlock_res(lock->l_resource);
+
+ spin_lock(&ns->ns_unused_lock);
+
if (--count == 0)
break;
}
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_unused_lock);
list_for_each_entry_safe(lock, next, &cblist, l_tmp) {
list_del_init(&lock->l_tmp);
struct ldlm_res_id res_id, int flags,
void *opaque)
{
- struct ldlm_resource *res;
struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
- struct ldlm_ast_work *w;
+ struct ldlm_resource *res;
+ struct ldlm_lock *lock;
ENTRY;
res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
RETURN(0);
}
- l_lock(&ns->ns_lock);
+ lock_res(res);
list_for_each(tmp, &res->lr_granted) {
- struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
if (opaque != NULL && lock->l_ast_data != opaque) {
/* See CBPENDING comment in ldlm_cancel_lru */
lock->l_flags |= LDLM_FL_CBPENDING;
- OBD_ALLOC(w, sizeof(*w));
- LASSERT(w);
-
- w->w_lock = LDLM_LOCK_GET(lock);
-
- list_add(&w->w_list, &list);
+ LASSERT(list_empty(&lock->l_bl_ast));
+ list_add(&lock->l_bl_ast, &list);
+ LDLM_LOCK_GET(lock);
}
- l_unlock(&ns->ns_lock);
+ unlock_res(res);
list_for_each_safe(tmp, next, &list) {
struct lustre_handle lockh;
int rc;
- w = list_entry(tmp, struct ldlm_ast_work, w_list);
+ lock = list_entry(tmp, struct ldlm_lock, l_bl_ast);
if (flags & LDLM_FL_LOCAL_ONLY) {
- ldlm_lock_cancel(w->w_lock);
+ ldlm_lock_cancel(lock);
} else {
- ldlm_lock2handle(w->w_lock, &lockh);
+ ldlm_lock2handle(lock, &lockh);
rc = ldlm_cli_cancel(&lockh);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_cancel: %d\n", rc);
}
- list_del(&w->w_list);
- LDLM_LOCK_PUT(w->w_lock);
- OBD_FREE(w, sizeof(*w));
+ list_del_init(&lock->l_bl_ast);
+ LDLM_LOCK_PUT(lock);
}
ldlm_resource_putref(res);
{
int no_resource = 0;
- spin_lock(&ns->ns_counter_lock);
+ spin_lock(&ns->ns_hash_lock);
if (ns->ns_resources == 0)
no_resource = 1;
- spin_unlock(&ns->ns_counter_lock);
+ spin_unlock(&ns->ns_hash_lock);
RETURN(no_resource);
}
RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
opaque));
- l_lock(&ns->ns_lock);
+ spin_lock(&ns->ns_hash_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
- struct list_head *tmp, *next;
- list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
- int rc;
+ struct list_head *tmp;
+ tmp = ns->ns_hash[i].next;
+ while (tmp != &(ns->ns_hash[i])) {
struct ldlm_resource *res;
+ int rc;
+
res = list_entry(tmp, struct ldlm_resource, lr_hash);
ldlm_resource_getref(res);
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_hash_lock);
rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
flags, opaque);
CERROR("cancel_unused_res ("LPU64"): %d\n",
res->lr_name.name[0], rc);
- l_lock(&ns->ns_lock);
- next = tmp->next;
- ldlm_resource_putref(res);
+ spin_lock(&ns->ns_hash_lock);
+ tmp = tmp->next;
+ ldlm_resource_putref_locked(res);
}
}
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_hash_lock);
+
if (flags & LDLM_FL_CONFIG_CHANGE)
l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi);
struct list_head *tmp, *next;
struct ldlm_lock *lock;
int rc = LDLM_ITER_CONTINUE;
- struct ldlm_namespace *ns = res->lr_namespace;
ENTRY;
if (!res)
RETURN(LDLM_ITER_CONTINUE);
- l_lock(&ns->ns_lock);
+ lock_res(res);
list_for_each_safe(tmp, next, &res->lr_granted) {
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
GOTO(out, rc = LDLM_ITER_STOP);
}
out:
- l_unlock(&ns->ns_lock);
+ unlock_res(res);
RETURN(rc);
}
ldlm_res_iterator_t iter, void *closure)
{
int i, rc = LDLM_ITER_CONTINUE;
+ struct ldlm_resource *res;
+ struct list_head *tmp;
- l_lock(&ns->ns_lock);
+ spin_lock(&ns->ns_hash_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
- struct list_head *tmp, *next;
- list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
- struct ldlm_resource *res =
- list_entry(tmp, struct ldlm_resource, lr_hash);
-
+ tmp = ns->ns_hash[i].next;
+ while (tmp != &(ns->ns_hash[i])) {
+ res = list_entry(tmp, struct ldlm_resource, lr_hash);
ldlm_resource_getref(res);
+ spin_unlock(&ns->ns_hash_lock);
+
rc = iter(res, closure);
- ldlm_resource_putref(res);
+
+ spin_lock(&ns->ns_hash_lock);
+ tmp = tmp->next;
+ ldlm_resource_putref_locked(res);
if (rc == LDLM_ITER_STOP)
GOTO(out, rc);
}
}
out:
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_hash_lock);
RETURN(rc);
}
return;
}
- l_lock(&ns->ns_lock);
ldlm_resource_foreach(res, iter, data);
- l_unlock(&ns->ns_lock);
ldlm_resource_putref(res);
EXIT;
}
/* ensure this doesn't fall to 0 before all have been queued */
atomic_inc(&imp->imp_replay_inflight);
- l_lock(&ns->ns_lock);
(void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
list_for_each_safe(pos, next, &list) {
if (rc)
break; /* or try to do the rest? */
}
- l_unlock(&ns->ns_lock);
atomic_dec(&imp->imp_replay_inflight);
strcpy(ns->ns_name, name);
INIT_LIST_HEAD(&ns->ns_root_list);
- l_lock_init(&ns->ns_lock);
ns->ns_refcount = 0;
ns->ns_client = client;
- spin_lock_init(&ns->ns_counter_lock);
- ns->ns_locks = 0;
+ spin_lock_init(&ns->ns_hash_lock);
+ atomic_set(&ns->ns_locks, 0);
ns->ns_resources = 0;
init_waitqueue_head(&ns->ns_waitq);
INIT_LIST_HEAD(&ns->ns_unused_list);
ns->ns_nr_unused = 0;
ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
+ spin_lock_init(&ns->ns_unused_lock);
down(&ldlm_namespace_lock);
list_add(&ns->ns_list_chain, &ldlm_namespace_list);
static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
int flags)
{
- struct list_head *tmp, *pos;
+ struct list_head *tmp;
int rc = 0, client = res->lr_namespace->ns_client;
int local_only = (flags & LDLM_FL_LOCAL_ONLY);
ENTRY;
- list_for_each_safe(tmp, pos, q) {
- struct ldlm_lock *lock;
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- LDLM_LOCK_GET(lock);
+
+ do {
+ struct ldlm_lock *lock = NULL;
+
+ /* first, we look for non-cleaned-yet lock
+ * all cleaned locks are marked by CLEANED flag */
+ lock_res(res);
+ list_for_each(tmp, q) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ if (lock->l_flags & LDLM_FL_CLEANED) {
+ lock = NULL;
+ continue;
+ }
+ LDLM_LOCK_GET(lock);
+ lock->l_flags |= LDLM_FL_CLEANED;
+ break;
+ }
+
+ if (lock == NULL) {
+ unlock_res(res);
+ break;
+ }
/* Set CBPENDING so nothing in the cancellation path
* can match this lock */
* will go away ... */
/* ... without sending a CANCEL message. */
lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+ unlock_res(res);
LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
if (lock->l_completion_ast)
lock->l_completion_ast(lock, 0, NULL);
if (client) {
struct lustre_handle lockh;
+
+ unlock_res(res);
ldlm_lock2handle(lock, &lockh);
if (!local_only) {
rc = ldlm_cli_cancel(&lockh);
if (local_only || rc != ELDLM_OK)
ldlm_lock_cancel(lock);
} else {
+ ldlm_resource_unlink_lock(lock);
+ unlock_res(res);
LDLM_DEBUG(lock, "Freeing a lock still held by a "
"client node");
-
- ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy(lock);
}
LDLM_LOCK_PUT(lock);
- }
+ } while (1);
+
EXIT;
}
int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags)
{
+ struct list_head *tmp;
int i;
if (ns == NULL) {
return ELDLM_OK;
}
- l_lock(&ns->ns_lock);
+ /* FIXME: protect by ns_hash_lock -bzzz */
for (i = 0; i < RES_HASH_SIZE; i++) {
- struct list_head *tmp, *pos;
- list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+ spin_lock(&ns->ns_hash_lock);
+ tmp = ns->ns_hash[i].next;
+ while (tmp != &(ns->ns_hash[i])) {
struct ldlm_resource *res;
res = list_entry(tmp, struct ldlm_resource, lr_hash);
+ spin_unlock(&ns->ns_hash_lock);
ldlm_resource_getref(res);
cleanup_resource(res, &res->lr_granted, flags);
cleanup_resource(res, &res->lr_converting, flags);
cleanup_resource(res, &res->lr_waiting, flags);
+ spin_lock(&ns->ns_hash_lock);
+ tmp = tmp->next;
+
/* XXX what a mess: don't force cleanup if we're
* local_only (which is only used by recovery). In that
* case, we probably still have outstanding lock refs
* which reference these resources. -phil */
- if (!ldlm_resource_putref(res) &&
+ if (!ldlm_resource_putref_locked(res) &&
!(flags & LDLM_FL_LOCAL_ONLY)) {
CERROR("Resource refcount nonzero (%d) after "
"lock cleanup; forcing cleanup.\n",
atomic_read(&res->lr_refcount));
ldlm_resource_dump(D_ERROR, res);
atomic_set(&res->lr_refcount, 1);
- ldlm_resource_putref(res);
+ ldlm_resource_putref_locked(res);
}
}
+ spin_unlock(&ns->ns_hash_lock);
}
- l_unlock(&ns->ns_lock);
return ELDLM_OK;
}
INIT_LIST_HEAD(&res->lr_granted);
INIT_LIST_HEAD(&res->lr_converting);
INIT_LIST_HEAD(&res->lr_waiting);
- sema_init(&res->lr_lvb_sem, 1);
atomic_set(&res->lr_refcount, 1);
+ spin_lock_init(&res->lr_lock);
+
+ /* one who creates the resource must unlock
+ * the semaphore after lvb initialization */
+ init_MUTEX_LOCKED(&res->lr_lvb_sem);
return res;
}
+/* must be called with hash lock held */
+static struct ldlm_resource *
+ldlm_resource_find(struct ldlm_namespace *ns, struct ldlm_res_id name, __u32 hash)
+{
+ struct list_head *bucket, *tmp;
+ struct ldlm_resource *res;
+
+ LASSERT_SPIN_LOCKED(&ns->ns_hash_lock);
+ bucket = ns->ns_hash + hash;
+
+ list_for_each(tmp, bucket) {
+ res = list_entry(tmp, struct ldlm_resource, lr_hash);
+ if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0)
+ return res;
+ }
+
+ return NULL;
+}
+
/* Args: locked namespace
* Returns: newly-allocated, referenced, unlocked resource */
static struct ldlm_resource *
ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent,
- struct ldlm_res_id name, __u32 type)
+ struct ldlm_res_id name, __u32 hash, __u32 type)
{
struct list_head *bucket;
- struct ldlm_resource *res;
+ struct ldlm_resource *res, *old_res;
ENTRY;
LASSERTF(type >= LDLM_MIN_TYPE && type <= LDLM_MAX_TYPE,
if (!res)
RETURN(NULL);
- spin_lock(&ns->ns_counter_lock);
- ns->ns_resources++;
- spin_unlock(&ns->ns_counter_lock);
-
- l_lock(&ns->ns_lock);
memcpy(&res->lr_name, &name, sizeof(res->lr_name));
res->lr_namespace = ns;
- ns->ns_refcount++;
-
res->lr_type = type;
res->lr_most_restr = LCK_NL;
- bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
+ spin_lock(&ns->ns_hash_lock);
+ old_res = ldlm_resource_find(ns, name, hash);
+ if (old_res) {
+ /* someone won the race and added the resource before */
+ ldlm_resource_getref(old_res);
+ spin_unlock(&ns->ns_hash_lock);
+ OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+ /* synchronize WRT resource creation */
+ if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
+ down(&old_res->lr_lvb_sem);
+ up(&old_res->lr_lvb_sem);
+ }
+ RETURN(old_res);
+ }
+
+ /* we won! let's add the resource */
+ bucket = ns->ns_hash + hash;
list_add(&res->lr_hash, bucket);
+ ns->ns_resources++;
+ ns->ns_refcount++;
if (parent == NULL) {
list_add(&res->lr_childof, &ns->ns_root_list);
res->lr_parent = parent;
list_add(&res->lr_childof, &parent->lr_children);
}
- l_unlock(&ns->ns_lock);
+ spin_unlock(&ns->ns_hash_lock);
+ if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
+ int rc;
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
+ rc = ns->ns_lvbo->lvbo_init(res);
+ if (rc)
+ CERROR("lvbo_init failed for resource "
+ LPU64": rc %d\n", name.name[0], rc);
+ /* we create resource with locked lr_lvb_sem */
+ up(&res->lr_lvb_sem);
+ }
RETURN(res);
}
ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
struct ldlm_res_id name, __u32 type, int create)
{
+ __u32 hash = ldlm_hash_fn(parent, name);
struct ldlm_resource *res = NULL;
- struct list_head *bucket, *tmp;
ENTRY;
LASSERT(ns != NULL);
LASSERT(ns->ns_hash != NULL);
LASSERT(name.name[0] != 0);
- l_lock(&ns->ns_lock);
- bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
-
- list_for_each(tmp, bucket) {
- res = list_entry(tmp, struct ldlm_resource, lr_hash);
-
- if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0) {
- ldlm_resource_getref(res);
- l_unlock(&ns->ns_lock);
- RETURN(res);
+ spin_lock(&ns->ns_hash_lock);
+ res = ldlm_resource_find(ns, name, hash);
+ if (res) {
+ ldlm_resource_getref(res);
+ spin_unlock(&ns->ns_hash_lock);
+ /* synchronize WRT resource creation */
+ if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
+ down(&res->lr_lvb_sem);
+ up(&res->lr_lvb_sem);
}
+ RETURN(res);
}
+ spin_unlock(&ns->ns_hash_lock);
- if (create) {
- res = ldlm_resource_add(ns, parent, name, type);
- if (res == NULL)
- GOTO(out, NULL);
- } else {
- res = NULL;
- }
-
- if (create && ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
- int rc;
-
- /* Although this is technically a lock inversion risk (lvb_sem
- * should be taken before DLM lock), this resource was just
- * created, so nobody else can take the lvb_sem yet. -p */
- down(&res->lr_lvb_sem);
- /* Drop the dlm lock, because lvbo_init can touch the disk */
- l_unlock(&ns->ns_lock);
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
- rc = ns->ns_lvbo->lvbo_init(res);
- up(&res->lr_lvb_sem);
- if (rc)
- CERROR("lvbo_init failed for resource "
- LPU64": rc %d\n", name.name[0], rc);
- } else {
-out:
- l_unlock(&ns->ns_lock);
- }
+ if (create == 0)
+ RETURN(NULL);
+ res = ldlm_resource_add(ns, parent, name, hash, type);
RETURN(res);
}
return res;
}
+int __ldlm_resource_putref_final(struct ldlm_resource *res, int locked)
+{
+ struct ldlm_namespace *ns = res->lr_namespace;
+ ENTRY;
+
+ if (!locked)
+ spin_lock(&ns->ns_hash_lock);
+
+ if (atomic_read(&res->lr_refcount) != 0) {
+ /* We lost the race. */
+ if (!locked)
+ spin_unlock(&ns->ns_hash_lock);
+ RETURN(0);
+ }
+
+ if (!list_empty(&res->lr_granted)) {
+ ldlm_resource_dump(D_ERROR, res);
+ LBUG();
+ }
+
+ if (!list_empty(&res->lr_converting)) {
+ ldlm_resource_dump(D_ERROR, res);
+ LBUG();
+ }
+
+ if (!list_empty(&res->lr_waiting)) {
+ ldlm_resource_dump(D_ERROR, res);
+ LBUG();
+ }
+
+ if (!list_empty(&res->lr_children)) {
+ ldlm_resource_dump(D_ERROR, res);
+ LBUG();
+ }
+
+ ns->ns_refcount--;
+ list_del_init(&res->lr_hash);
+ list_del_init(&res->lr_childof);
+
+ ns->ns_resources--;
+ if (ns->ns_resources == 0)
+ wake_up(&ns->ns_waitq);
+
+ if (!locked)
+ spin_unlock(&ns->ns_hash_lock);
+
+ /* we just unhashed the resource, nobody should find it */
+ LASSERT(atomic_read(&res->lr_refcount) == 0);
+ if (res->lr_lvb_data)
+ OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
+ OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+ RETURN(1);
+}
+
/* Returns 1 if the resource was freed, 0 if it remains. */
int ldlm_resource_putref(struct ldlm_resource *res)
{
LASSERT(atomic_read(&res->lr_refcount) > 0);
LASSERT(atomic_read(&res->lr_refcount) < LI_POISON);
- if (atomic_dec_and_test(&res->lr_refcount)) {
- struct ldlm_namespace *ns = res->lr_namespace;
- ENTRY;
-
- l_lock(&ns->ns_lock);
-
- if (atomic_read(&res->lr_refcount) != 0) {
- /* We lost the race. */
- l_unlock(&ns->ns_lock);
- RETURN(rc);
- }
-
- if (!list_empty(&res->lr_granted)) {
- ldlm_resource_dump(D_ERROR, res);
- LBUG();
- }
+ LASSERT(atomic_read(&res->lr_refcount) >= 0);
+ if (atomic_dec_and_test(&res->lr_refcount))
+ rc = __ldlm_resource_putref_final(res, 0);
- if (!list_empty(&res->lr_converting)) {
- ldlm_resource_dump(D_ERROR, res);
- LBUG();
- }
-
- if (!list_empty(&res->lr_waiting)) {
- ldlm_resource_dump(D_ERROR, res);
- LBUG();
- }
-
- if (!list_empty(&res->lr_children)) {
- ldlm_resource_dump(D_ERROR, res);
- LBUG();
- }
-
- ns->ns_refcount--;
- list_del_init(&res->lr_hash);
- list_del_init(&res->lr_childof);
- if (res->lr_lvb_data)
- OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
- l_unlock(&ns->ns_lock);
+ RETURN(rc);
+}
- OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+/* Returns 1 if the resource was freed, 0 if it remains. */
+int ldlm_resource_putref_locked(struct ldlm_resource *res)
+{
+ int rc = 0;
+ ENTRY;
- spin_lock(&ns->ns_counter_lock);
- ns->ns_resources--;
- if (ns->ns_resources == 0)
- wake_up(&ns->ns_waitq);
- spin_unlock(&ns->ns_counter_lock);
+ CDEBUG(D_INFO, "putref res: %p count: %d\n", res,
+ atomic_read(&res->lr_refcount) - 1);
+ LASSERT(atomic_read(&res->lr_refcount) > 0);
+ LASSERT(atomic_read(&res->lr_refcount) < LI_POISON);
- rc = 1;
- EXIT;
- }
+ LASSERT(atomic_read(&res->lr_refcount) >= 0);
+ if (atomic_dec_and_test(&res->lr_refcount))
+ rc = __ldlm_resource_putref_final(res, 1);
RETURN(rc);
}
void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
struct ldlm_lock *lock)
{
- l_lock(&res->lr_namespace->ns_lock);
+ check_res_locked(res);
ldlm_resource_dump(D_OTHER, res);
CDEBUG(D_OTHER, "About to add this lock:\n");
if (lock->l_destroyed) {
CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
- goto out;
+ return;
}
LASSERT(list_empty(&lock->l_res_link));
list_add_tail(&lock->l_res_link, head);
- out:
- l_unlock(&res->lr_namespace->ns_lock);
}
void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
{
struct ldlm_resource *res = original->l_resource;
- l_lock(&res->lr_namespace->ns_lock);
+ check_res_locked(res);
ldlm_resource_dump(D_OTHER, res);
CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original);
if (new->l_destroyed) {
CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
- goto out;
+ return;
}
LASSERT(list_empty(&new->l_res_link));
-
list_add(&new->l_res_link, &original->l_res_link);
- out:
- l_unlock(&res->lr_namespace->ns_lock);
}
void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
{
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ check_res_locked(lock->l_resource);
list_del_init(&lock->l_res_link);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
}
void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
CDEBUG(level, "--- Namespace: %s (rc: %d, client: %d)\n",
ns->ns_name, ns->ns_refcount, ns->ns_client);
- l_lock(&ns->ns_lock);
- if (time_after(jiffies, ns->ns_next_dump)) {
- list_for_each(tmp, &ns->ns_root_list) {
- struct ldlm_resource *res;
- res = list_entry(tmp, struct ldlm_resource, lr_childof);
-
- /* Once we have resources with children, this should
- * really dump them recursively. */
- ldlm_resource_dump(level, res);
- }
- ns->ns_next_dump = jiffies + 10 * HZ;
+ if (time_before(jiffies, ns->ns_next_dump))
+ return;
+
+ spin_lock(&ns->ns_hash_lock);
+ tmp = ns->ns_root_list.next;
+ while (tmp != &ns->ns_root_list) {
+ struct ldlm_resource *res;
+ res = list_entry(tmp, struct ldlm_resource, lr_childof);
+
+ ldlm_resource_getref(res);
+ spin_unlock(&ns->ns_hash_lock);
+
+ lock_res(res);
+ ldlm_resource_dump(level, res);
+ unlock_res(res);
+
+ spin_lock(&ns->ns_hash_lock);
+ tmp = tmp->next;
+ ldlm_resource_putref_locked(res);
}
- l_unlock(&ns->ns_lock);
+ ns->ns_next_dump = jiffies + 10 * HZ;
+ spin_unlock(&ns->ns_hash_lock);
}
void ldlm_resource_dump(int level, struct ldlm_resource *res)
goto iput;
ll_pgcache_remove_extent(inode, lsm, lock, stripe);
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
down(&lli->lli_size_sem);
+ lock_res(lock->l_resource);
kms = ldlm_extent_shift_kms(lock,
lsm->lsm_oinfo[stripe].loi_kms);
LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
lsm->lsm_oinfo[stripe].loi_kms, kms);
lsm->lsm_oinfo[stripe].loi_kms = kms;
+ unlock_res(lock->l_resource);
up(&lli->lli_size_sem);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
//ll_try_done_writing(inode);
iput:
iput(inode);
lvb = lock->l_lvb_data;
lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
down(&inode->i_sem);
+ lock_res(lock->l_resource);
kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
kms = ldlm_extent_shift_kms(NULL, kms);
if (lsm->lsm_oinfo[stripe].loi_kms != kms)
LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
lsm->lsm_oinfo[stripe].loi_kms, kms);
lsm->lsm_oinfo[stripe].loi_kms = kms;
+ unlock_res(lock->l_resource);
up(&inode->i_sem);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
}
iput:
struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
{
struct inode *inode = NULL;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
+ /* NOTE: we depend on atomic igrab() -bzzz */
+ lock_res(lock->l_resource);
if (lock->l_ast_data) {
struct ll_inode_info *lli = ll_i2info(lock->l_ast_data);
if (lli->lli_inode_magic == LLI_INODE_MAGIC) {
inode = NULL;
}
}
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
return inode;
}
lock = ldlm_handle2lock(lockh);
LASSERT(lock != NULL);
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
#ifdef __KERNEL__
if (lock->l_ast_data && lock->l_ast_data != data) {
struct inode *new_inode = data;
}
#endif
lock->l_ast_data = data;
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
LDLM_LOCK_PUT(lock);
EXIT;
struct dentry *de = mds_id2dentry(obd, id, mnt), *retval = de;
ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
struct ldlm_res_id res_id = { .name = {0} };
- int flags = 0, rc;
+ int flags = LDLM_FL_ATOMIC_CB, rc;
ENTRY;
if (IS_ERR(de))
RETURN(ERR_PTR(-ENOLCK));
}
}
- flags = 0;
+ flags = LDLM_FL_ATOMIC_CB;
res_id.name[2] = full_name_hash((unsigned char *)name, namelen);
}
/* XXX layering violation! -phil */
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
/*
* get this: if mds_blocking_ast is racing with mds_intent_policy, such
* blocking function anymore. So check, and return early, if so.
*/
if (lock->l_blocking_ast != mds_blocking_ast) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
RETURN(0);
}
lock->l_flags |= LDLM_FL_CBPENDING;
do_ast = (!lock->l_readers && !lock->l_writers);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
if (do_ast) {
struct lustre_handle lockh;
if (!IS_ERR(new) && new->d_inode) {
struct lustre_id sid;
- CWARN("mkdir() repairing is on its way: %lu/%lu\n",
- (unsigned long)id_ino(&id), (unsigned long)id_gen(&id));
+ CDEBUG(D_OTHER, "mkdir repairing %lu/%lu\n",
+ (unsigned long)id_ino(&id),
+ (unsigned long)id_gen(&id));
obdo_from_inode(&repbody->oa, new->d_inode,
FILTER_VALID_FLAGS);
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks;
+ struct list_head work;
struct ldlm_lock *lock, *next;
struct ldlm_lock_desc desc;
return;
ENTRY;
- l_lock(&ns->ns_lock);
+ CERROR("implement right locking here! -bzzz\n");
+ INIT_LIST_HEAD(&work);
+ spin_lock(&exp->exp_ldlm_data.led_lock);
list_for_each_entry_safe(lock, next, locklist, l_export_chain) {
- if (lock->l_req_mode != lock->l_granted_mode)
+
+ lock_res(lock->l_resource);
+ if (lock->l_req_mode != lock->l_granted_mode) {
+ unlock_res(lock->l_resource);
continue;
+ }
LASSERT(lock->l_resource);
if (lock->l_resource->lr_type != LDLM_IBITS &&
- lock->l_resource->lr_type != LDLM_PLAIN)
+ lock->l_resource->lr_type != LDLM_PLAIN) {
+ unlock_res(lock->l_resource);
continue;
+ }
- if (lock->l_flags & LDLM_FL_AST_SENT)
+ if (lock->l_flags & LDLM_FL_AST_SENT) {
+ unlock_res(lock->l_resource);
continue;
+ }
lock->l_flags |= LDLM_FL_AST_SENT;
+ unlock_res(lock->l_resource);
/* the desc just pretend to exclusive */
ldlm_lock2desc(lock, &desc);
lock->l_blocking_ast(lock, &desc, NULL, LDLM_CB_BLOCKING);
}
- l_unlock(&ns->ns_lock);
+ spin_unlock(&exp->exp_ldlm_data.led_lock);
+
EXIT;
}
if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
return;
- l_lock(&obd->obd_namespace->ns_lock);
+ spin_lock(&obd->obd_namespace->ns_hash_lock);
list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
struct ldlm_lock *lock;
lock = list_entry(iter, struct ldlm_lock, l_export_chain);
lockh->cookie);
if (old_lock)
*old_lock = LDLM_LOCK_GET(lock);
- l_unlock(&obd->obd_namespace->ns_lock);
+ spin_unlock(&obd->obd_namespace->ns_hash_lock);
return;
}
}
- l_unlock(&obd->obd_namespace->ns_lock);
+ spin_unlock(&obd->obd_namespace->ns_hash_lock);
/* If the xid matches, then we know this is a resent request,
* and allow it. (It's probably an OPEN, for which we don't
}
/* Fixup the lock to be given to the client */
- l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+ lock_res(new_lock->l_resource);
new_lock->l_readers = 0;
new_lock->l_writers = 0;
new_lock->l_export = class_export_get(req->rq_export);
+
+ spin_lock(&new_lock->l_export->exp_ldlm_data.led_lock);
list_add(&new_lock->l_export_chain,
&new_lock->l_export->exp_ldlm_data.led_held_locks);
+ spin_unlock(&new_lock->l_export->exp_ldlm_data.led_lock);
new_lock->l_blocking_ast = lock->l_blocking_ast;
new_lock->l_completion_ast = lock->l_completion_ast;
new_lock->l_flags &= ~LDLM_FL_LOCAL;
+ unlock_res(new_lock->l_resource);
LDLM_LOCK_PUT(new_lock);
- l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
RETURN(ELDLM_LOCK_REPLACED);
}
{
struct ldlm_res_id child_res_id = { .name = { inode->i_ino, 0, 1, 0 } };
struct lustre_handle lockh;
- int lock_flags = 0;
+ int lock_flags = LDLM_FL_ATOMIC_CB;
int rc;
ENTRY;
if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
struct ldlm_res_id child_res_id = { .name = {0}};
ldlm_policy_data_t policy;
- int lock_flags = 0;
+ int lock_flags = LDLM_FL_ATOMIC_CB;
/* LOOKUP lock will protect dentry on client -bzzz */
policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP |
CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
res_id[0]->name[0], res_id[1]->name[0]);
- flags = LDLM_FL_LOCAL_ONLY;
+ flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
LDLM_IBITS, policies[0], lock_modes[0], &flags,
mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
memcpy(handles[1], handles[0], sizeof(*(handles[1])));
ldlm_lock_addref(handles[1], lock_modes[1]);
} else if (res_id[1]->name[0] != 0) {
- flags = LDLM_FL_LOCAL_ONLY;
+ flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
*res_id[1], LDLM_IBITS, policies[1],
lock_modes[1], &flags, mds_blocking_ast,
*dchildp = dchild = vchild;
if (dchild->d_inode || (dchild->d_flags & DCACHE_CROSS_REF)) {
- int flags = 0;
+ int flags = LDLM_FL_ATOMIC_CB;
if (dchild->d_inode) {
down(&dchild->d_inode->i_sem);
if (name && IS_PDIROPS((*dparentp)->d_inode)) {
struct ldlm_res_id res_id = { .name = {0} };
ldlm_policy_data_t policy;
- int flags = 0;
+ int flags = LDLM_FL_ATOMIC_CB;
*update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode);
if (*update_mode) {
int rc = 0, cleanup_phase = 0;
struct dentry *de_src = NULL;
ldlm_policy_data_t policy;
- int flags = 0;
+ int flags = LDLM_FL_ATOMIC_CB;
ENTRY;
DEBUG_REQ(D_INODE, req, "%s: request to acquire i_nlinks "DLID4"\n",
#ifdef S_PDIROPS
if (IS_PDIROPS(de_tgt_dir->d_inode)) {
- int flags = 0;
+ int flags = LDLM_FL_ATOMIC_CB;
update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX);
if (update_mode) {
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
INIT_LIST_HEAD(&export->exp_outstanding_replies);
/* XXX this should be in LDLM init */
INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
+ spin_lock_init(&export->exp_ldlm_data.led_lock);
INIT_LIST_HEAD(&export->exp_handle.h_link);
class_handle_hash(&export->exp_handle, export_handle_addref);
}
/* XXX layering violation! -phil */
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
/* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
* such that filter_blocking_ast is called just before l_i_p takes the
* ns_lock, then by the time we get the lock, we might not be the
* correct blocking function anymore. So check, and return early, if
* so. */
if (lock->l_blocking_ast != filter_blocking_ast) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
RETURN(0);
}
lock->l_flags |= LDLM_FL_CBPENDING;
do_ast = (!lock->l_readers && !lock->l_writers);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
if (do_ast) {
struct lustre_handle lockh;
lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
lock->l_req_mode = LCK_PR;
- l_lock(&res->lr_namespace->ns_lock);
-
- res->lr_tmp = &rpc_list;
- rc = policy(lock, &tmpflags, 0, &err);
- res->lr_tmp = NULL;
+ lock_res(res);
+ rc = policy(lock, &tmpflags, 0, &err, &rpc_list);
/* FIXME: we should change the policy function slightly, to not make
* this list at all, since we just turn around and free it */
while (!list_empty(&rpc_list)) {
- struct ldlm_ast_work *w =
- list_entry(rpc_list.next, struct ldlm_ast_work, w_list);
- list_del(&w->w_list);
- LDLM_LOCK_PUT(w->w_lock);
- OBD_FREE(w, sizeof(*w));
+ struct ldlm_lock *wlock =
+ list_entry(rpc_list.next, struct ldlm_lock, l_cp_ast);
+ LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
+ LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
+ lock->l_flags &= ~LDLM_FL_CP_REQD;
+ list_del_init(&wlock->l_cp_ast);
+ LDLM_LOCK_PUT(wlock);
}
if (rc == LDLM_ITER_CONTINUE) {
/* The lock met with no resistance; we're finished. */
- l_unlock(&res->lr_namespace->ns_lock);
+ unlock_res(res);
RETURN(ELDLM_LOCK_REPLACED);
}
* policy nicely created a list of all PW locks for us. We will choose
* the highest of those which are larger than the size in the LVB, if
* any, and perform a glimpse callback. */
- down(&res->lr_lvb_sem);
res_lvb = res->lr_lvb_data;
LASSERT(res_lvb != NULL);
*reply_lvb = *res_lvb;
- up(&res->lr_lvb_sem);
list_for_each(tmp, &res->lr_granted) {
struct ldlm_lock *tmplock =
LDLM_LOCK_PUT(l);
l = LDLM_LOCK_GET(tmplock);
}
- l_unlock(&res->lr_namespace->ns_lock);
+ unlock_res(res);
/* There were no PW locks beyond the size in the LVB; finished. */
if (l == NULL)
res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
}
- down(&res->lr_lvb_sem);
+ lock_res(res);
*reply_lvb = *res_lvb;
- up(&res->lr_lvb_sem);
+ unlock_res(res);
out:
LDLM_LOCK_PUT(l);
//GOTO(out, rc = -EPROTO);
GOTO(out, rc = 0);
}
+
+ lock_res(res);
if (new->lvb_size > lvb->lvb_size || !increase) {
CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: "
LPU64" -> "LPU64"\n", res->lr_name.name[0],
lvb->lvb_ctime, new->lvb_ctime);
lvb->lvb_ctime = new->lvb_ctime;
}
+ unlock_res(res);
}
/* Update the LVB from the disk inode */
oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
+ lock_res(res);
if (dentry->d_inode->i_size > lvb->lvb_size || !increase) {
CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: "
LPU64" -> %llu\n", res->lr_name.name[0],
LPU64" -> %lu\n", res->lr_name.name[0],
lvb->lvb_blocks, dentry->d_inode->i_blocks);
lvb->lvb_blocks = dentry->d_inode->i_blocks;
+ unlock_res(res);
f_dput(dentry);
out:
return;
}
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock_res(lock->l_resource);
#ifdef __KERNEL__
if (lock->l_ast_data && lock->l_ast_data != data) {
struct inode *new_inode = data;
}
#endif
lock->l_ast_data = data;
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ unlock_res(lock->l_resource);
LDLM_LOCK_PUT(lock);
}