struct list_head ns_link; /* in the list of ns's */
__u32 ns_id; /* identifier of ns */
struct list_head *ns_hash; /* hash table for ns */
+ atomic_t ns_refcount; /* count of resources in the hash */
struct list_head ns_root_list; /* all root resources in ns */
struct obd_device *ns_obddev;
};
struct ldlm_lock;
typedef int (*ldlm_lock_callback)(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data);
+ void *data, __u32 data_len);
struct ldlm_lock {
struct ldlm_resource *l_resource;
ldlm_lock_callback l_completion_ast;
ldlm_lock_callback l_blocking_ast;
struct lustre_peer *l_peer;
+ void *l_data;
+ __u32 l_data_len;
//void *l_event;
//XXX cluster_host l_holder;
__u32 l_version[RES_VERSION_SIZE];
#define LDLM_EXTENT 0x1
#define LDLM_MDSINTENT 0x2
+#define LDLM_MAX_TYPE 0x2
extern ldlm_res_compat ldlm_res_compat_table [];
extern ldlm_res_policy ldlm_res_policy_table [];
__u32 lr_version[RES_VERSION_SIZE];
__u32 lr_type; /* PLAIN, EXTENT, or MDSINTENT */
spinlock_t lr_lock;
-
- void (*lr_blocking)(struct ldlm_lock *lock, struct ldlm_lock *new);
};
struct ldlm_extent {
static inline void *ldlm_handle2object(struct ldlm_handle *handle)
{
- return (void *)(unsigned long)(handle->addr);
+ if (handle)
+ return (void *)(unsigned long)(handle->addr);
+ return NULL;
}
static inline void ldlm_object2handle(void *object, struct ldlm_handle *handle)
/* ldlm_lock.c */
ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
__u32 ns_id,
- struct ldlm_handle *parent_res_handle,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id,
+ __u32 type,
ldlm_mode_t mode,
int *flags,
ldlm_lock_callback completion,
ldlm_lock_callback blocking,
- __u32 data_len,
void *data,
+ __u32 data_len,
struct ldlm_handle *lockh);
+ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
+ struct ldlm_handle *lockh,
+ int new_mode, int *flags);
+ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
+ struct ldlm_handle *lockh);
void ldlm_lock_dump(struct ldlm_lock *lock);
/* ldlm_test.c */
int ldlm_test(struct obd_device *device);
/* resource.c */
-struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id);
-struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id);
+struct ldlm_namespace *ldlm_namespace_find(struct obd_device *, __u32 id);
+struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 id);
+int ldlm_namespace_free(struct ldlm_namespace *ns);
void ldlm_resource_dump(struct ldlm_resource *res);
struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
- __u64 *name, int create);
+ __u64 *name, __u32 type, int create);
int ldlm_resource_put(struct ldlm_resource *res);
+void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
+ struct ldlm_lock *lock);
+void ldlm_resource_del_lock(struct ldlm_lock *lock);
#endif /* __KERNEL__ */
struct ldlm_request {
__u32 ns_id;
__u64 res_id[RES_NAME_SIZE];
+ __u32 res_type;
__u32 flags;
struct ldlm_handle parent_res_handle;
struct ldlm_handle parent_lock_handle;
{
struct ldlm_extent *child_ex, *new_ex;
- child_ex = ldlm_res2extent(child);
- new_ex = ldlm_res2extent(new);
+ child_ex = ldlm_res2extent(child);
+ new_ex = ldlm_res2extent(new);
- if (MAX(child_ex->start, new_ex->start) >
- MIN(child_ex->end, new_ex->end))
- return 0;
+ if (MAX(child_ex->start, new_ex->start) <=
+ MIN(child_ex->end, new_ex->end))
+ return 0;
return 1;
}
*
* To reconstruct our formulas, take a deep breath. */
int ldlm_extent_policy(struct ldlm_resource *parent,
- __u64 *res_id_in, __u64 *res_id_out,
- ldlm_mode_t mode, void *data)
+ __u64 *res_id_in, __u64 *res_id_out,
+ ldlm_mode_t mode, void *data)
{
struct ldlm_extent *new_ex, *req_ex;
struct list_head *tmp;
+ int rc = 0;
req_ex = (struct ldlm_extent *)res_id_in;
list_for_each(tmp, &parent->lr_children) {
struct ldlm_resource *res;
- struct ldlm_extent *res_ex;
+ struct ldlm_extent *exist_ex;
res = list_entry(tmp, struct ldlm_resource, lr_childof);
- res_ex = ldlm_res2extent(res);
+ exist_ex = ldlm_res2extent(res);
- if (res_ex->end < req_ex->start)
- new_ex->start = MIN(res_ex->end, new_ex->start);
+ if (exist_ex->end < req_ex->start)
+ new_ex->start = MIN(exist_ex->end, new_ex->start);
else {
- if (res_ex->start < req_ex->start)
+ if (exist_ex->start < req_ex->start &&
+ !lockmode_compat(res->lr_most_restr, mode))
/* Policy: minimize conflict overlap */
new_ex->start = req_ex->start;
}
- if (res_ex->start > req_ex->end)
- new_ex->end = MAX(res_ex->start, new_ex->end);
+ if (exist_ex->start > req_ex->end)
+ new_ex->end = MAX(exist_ex->start, new_ex->end);
else {
- if (res_ex->end > req_ex->end)
+ if (exist_ex->end > req_ex->end &&
+ !lockmode_compat(res->lr_most_restr, mode))
/* Policy: minimize conflict overlap */
new_ex->end = req_ex->end;
}
}
- return 0;
+ if (new_ex->end != req_ex->end || new_ex->start != req_ex->start)
+ rc = ELDLM_RES_CHANGED;
+
+ return rc;
}
{
struct ldlm_lock *lock;
+ if (resource == NULL)
+ LBUG();
+
lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
if (lock == NULL)
return NULL;
rc = 1;
- if (lock->l_resource->lr_blocking != NULL)
- lock->l_resource->lr_blocking(lock, new);
+ if (lock->l_blocking_ast != NULL)
+ lock->l_blocking_ast(lock, new, lock->l_data,
+ lock->l_data_len);
}
return rc;
}
-
-static int ldlm_reprocess_queue(struct list_head *queue,
- struct list_head *granted_list)
+static int ldlm_lock_compat(struct ldlm_lock *lock)
{
- struct list_head *tmp1, *tmp2;
- struct ldlm_resource *res;
- int rc = 0;
+ struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
+ ldlm_res_compat compat;
- list_for_each(tmp1, queue) {
- struct ldlm_lock *pending;
- rc = 0;
- pending = list_entry(tmp1, struct ldlm_lock, l_res_link);
-
- /* check if pending can go in ... */
- list_for_each(tmp2, granted_list) {
- struct ldlm_lock *lock;
- lock = list_entry(tmp2, struct ldlm_lock, l_res_link);
- if (lockmode_compat(lock->l_granted_mode,
- pending->l_req_mode))
+ if (parent_res &&
+ (compat = ldlm_res_compat_table[parent_res->lr_type])) {
+ struct list_head *tmp;
+ int incompat = 0;
+ list_for_each(tmp, &parent_res->lr_children) {
+ struct ldlm_resource *child;
+ child = list_entry(tmp, struct ldlm_resource,
+ lr_childof);
+
+ /* compat will return 0 when child == l_resource
+ * hence notifications on the same resource are incl. */
+ if (compat(child, lock->l_resource))
continue;
- else {
- /* no, we are done */
- rc = 1;
- break;
- }
- }
- if (rc) {
- /* no - we are done */
- break;
+ incompat |= ldlm_notify_incompatible(&child->lr_granted,
+ lock);
}
- res = pending->l_resource;
- list_del(&pending->l_res_link);
- list_add(&pending->l_res_link, &res->lr_granted);
- pending->l_granted_mode = pending->l_req_mode;
+ return incompat;
+ }
+
+ return ldlm_notify_incompatible(&lock->l_resource->lr_granted, lock);
+}
+
+static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
+{
+ ldlm_resource_add_lock(res, &res->lr_granted, lock);
+ lock->l_granted_mode = lock->l_req_mode;
+
+ if (lock->l_granted_mode < res->lr_most_restr)
+ res->lr_most_restr = lock->l_granted_mode;
+
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock, NULL, NULL, 0);
+}
- if (pending->l_granted_mode < res->lr_most_restr)
- res->lr_most_restr = pending->l_granted_mode;
+static int ldlm_reprocess_queue(struct ldlm_lock *lock,
+ struct list_head *converting,
+ struct list_head *granted_list)
+{
+ struct list_head *tmp, *pos;
+ int incompat = 0;
- if (pending->l_completion_ast)
- pending->l_completion_ast(pending, NULL, NULL);
-
+ list_for_each_safe(tmp, pos, converting) {
+ struct ldlm_lock *pending;
+ pending = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ incompat = ldlm_lock_compat(pending);
+ if (incompat)
+ break;
+ list_del(&pending->l_res_link);
+ ldlm_grant_lock(pending->l_resource, pending);
}
- return rc;
+ return incompat;
}
+/* XXX: Revisit the error handling; we do not, for example, do
+ * ldlm_resource_put()s in our error cases, and we probably leak an allocated
+ * memory. */
ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
__u32 ns_id,
- struct ldlm_handle *parent_res_handle,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id,
+ __u32 type,
ldlm_mode_t mode,
int *flags,
ldlm_lock_callback completion,
ldlm_lock_callback blocking,
- __u32 data_len,
void *data,
+ __u32 data_len,
struct ldlm_handle *lockh)
{
struct ldlm_namespace *ns;
struct ldlm_lock *lock, *parent_lock;
int incompat = 0, rc;
__u64 new_id[RES_NAME_SIZE];
- ldlm_res_compat compat;
ldlm_res_policy policy;
ENTRY;
-
- parent_res = ldlm_handle2object(parent_res_handle);
+
parent_lock = ldlm_handle2object(parent_lock_handle);
+ if ( parent_lock )
+ parent_res = parent_lock->l_resource;
+ else
+ parent_res = NULL;
ns = ldlm_namespace_find(obddev, ns_id);
if (ns == NULL || ns->ns_hash == NULL)
}
}
- res = ldlm_resource_get(ns, parent_res, res_id, 1);
+ res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
if (res == NULL)
RETURN(-ENOMEM);
if (lock == NULL)
RETURN(-ENOMEM);
+ lock->l_data = data;
+ lock->l_data_len = data_len;
if ((*flags) & LDLM_FL_COMPLETION_AST)
lock->l_completion_ast = completion;
if ((*flags) & LDLM_FL_BLOCKING_AST)
/* FIXME: We may want to optimize by checking lr_most_restr */
if (!list_empty(&res->lr_converting)) {
- list_add(&lock->l_res_link, res->lr_waiting.prev);
- rc = -ELDLM_BLOCK_CONV;
- GOTO(out, rc);
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ GOTO(out, rc = -ELDLM_BLOCK_CONV);
}
if (!list_empty(&res->lr_waiting)) {
- list_add(&lock->l_res_link, res->lr_waiting.prev);
- rc = -ELDLM_BLOCK_WAIT;
- GOTO(out, rc);
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ GOTO(out, rc = -ELDLM_BLOCK_WAIT);
}
- if (parent_res &&
- (compat = ldlm_res_compat_table[parent_res->lr_type])) {
- struct list_head *tmp;
- list_for_each(tmp, &parent_res->lr_children) {
- struct ldlm_resource *child;
- child = list_entry(tmp, struct ldlm_resource,
- lr_childof);
+ incompat = ldlm_lock_compat(lock);
+ if (incompat) {
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ GOTO(out, rc = -ELDLM_BLOCK_GRANTED);
+ }
- if (compat(child, res))
- continue;
+ ldlm_grant_lock(res, lock);
+ GOTO(out, rc = ELDLM_OK);
- incompat |= ldlm_notify_incompatible(&child->lr_granted,
- lock);
- }
- } else
- incompat = ldlm_notify_incompatible(&res->lr_granted, lock);
+ out:
+ spin_unlock(&res->lr_lock);
+ return rc;
+}
- if (incompat) {
- list_add(&lock->l_res_link, res->lr_waiting.prev);
- rc = -ELDLM_BLOCK_GRANTED;
- GOTO(out, rc);
+static void ldlm_reprocess_res_compat(struct ldlm_lock *lock)
+{
+ struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
+ struct list_head *tmp;
+ int do_waiting;
+
+ list_for_each(tmp, &parent_res->lr_children) {
+ struct ldlm_resource *child;
+ child = list_entry(tmp, struct ldlm_resource, lr_childof);
+
+ ldlm_reprocess_queue(lock, &child->lr_converting,
+ &child->lr_granted);
+ if (!list_empty(&child->lr_converting))
+ do_waiting = 0;
}
- list_add(&lock->l_res_link, &res->lr_granted);
- lock->l_granted_mode = mode;
- if (mode < res->lr_most_restr)
- res->lr_most_restr = mode;
+ if (!do_waiting)
+ return;
- if (lock->l_completion_ast)
- lock->l_completion_ast(lock, NULL, NULL);
+ list_for_each(tmp, &parent_res->lr_children) {
+ struct ldlm_resource *child;
+ child = list_entry(tmp, struct ldlm_resource, lr_childof);
- rc = ELDLM_OK;
- GOTO(out, rc);
+ ldlm_reprocess_queue(lock, &child->lr_waiting,
+ &child->lr_granted);
+ }
+}
- out:
- spin_unlock(&res->lr_lock);
- return rc;
+static void ldlm_reprocess_all(struct ldlm_lock *lock)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ struct ldlm_resource *parent_res = res->lr_parent;
+
+ if (parent_res && ldlm_res_compat_table[parent_res->lr_type]) {
+ ldlm_reprocess_res_compat(lock);
+ return;
+ }
+
+ ldlm_reprocess_queue(lock, &res->lr_converting, &res->lr_granted);
+ if (list_empty(&res->lr_converting))
+ ldlm_reprocess_queue(lock, &res->lr_waiting, &res->lr_granted);
}
ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
struct ldlm_resource *res;
ENTRY;
- lock = (struct ldlm_lock *)(unsigned long)lockh->addr;
+ lock = ldlm_handle2object(lockh);
res = lock->l_resource;
- list_del(&lock->l_res_link);
- kmem_cache_free(ldlm_lock_slab, lock);
- if (ldlm_resource_put(res)) {
- EXIT;
- return 0;
- }
+ ldlm_resource_del_lock(lock);
- ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted);
- if (list_empty(&res->lr_converting))
- ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted);
+ kmem_cache_free(ldlm_lock_slab, lock);
+ if (ldlm_resource_put(res))
+ RETURN(ELDLM_OK);
+ ldlm_reprocess_all(lock);
- return 0;
+ RETURN(ELDLM_OK);
}
ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
struct ldlm_handle *lockh,
- int new_mode, int flags)
+ int new_mode, int *flags)
{
struct ldlm_lock *lock;
struct ldlm_resource *res;
ENTRY;
- lock = (struct ldlm_lock *)(unsigned long)lockh->addr;
+ lock = ldlm_handle2object(lockh);
res = lock->l_resource;
list_del(&lock->l_res_link);
lock->l_req_mode = new_mode;
- list_add(&lock->l_res_link, &res->lr_converting);
+ list_add(&lock->l_res_link, res->lr_converting.prev);
- ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted);
- if (list_empty(&res->lr_converting))
- ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted);
+ ldlm_reprocess_all(lock);
- return 0;
+ RETURN(ELDLM_OK);
}
void ldlm_lock_dump(struct ldlm_lock *lock)
extern kmem_cache_t *ldlm_lock_slab;
static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data)
+ void *data, __u32 data_len)
{
LBUG();
return 0;
msg->xid = req_msg->xid;
- err = ldlm_local_lock_enqueue(req->rq_obd, dlm_req->ns_id,
- &dlm_req->parent_res_handle,
+ err = ldlm_local_lock_enqueue(req->rq_obd,
+ dlm_req->ns_id,
&dlm_req->parent_lock_handle,
- dlm_req->res_id, dlm_req->mode,
- &dlm_req->flags, ldlm_client_callback,
+ dlm_req->res_id,
+ dlm_req->res_type,
+ dlm_req->mode,
+ &dlm_req->flags,
+ ldlm_client_callback,
ldlm_client_callback,
- req_msg->buflens[1],
lustre_msg_buf(1, req_msg),
+ req_msg->buflens[1],
&dlm_rep->lock_handle);
msg->status = HTON__u32(err);
LDLM_REQUEST_PORTAL,
LDLM_REPLY_PORTAL,
"self", ldlm_handle);
+ if (!ldlm->ldlm_service)
+ LBUG();
err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
if (err)
RETURN(0);
}
+static int do_free_namespace(struct ldlm_namespace *ns)
+{
+ struct list_head *tmp, *pos;
+ int i, rc;
+
+ for (i = 0; i < RES_HASH_SIZE; i++) {
+ list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+ struct ldlm_resource *res;
+ res = list_entry(tmp, struct ldlm_resource, lr_hash);
+ list_del_init(&res->lr_hash);
+
+ rc = 0;
+ while (rc == 0)
+ rc = ldlm_resource_put(res);
+ }
+ }
+
+ return ldlm_namespace_free(ns);
+}
+
+static int ldlm_free_all(struct obd_device *obddev)
+{
+ struct list_head *tmp, *pos;
+ int rc = 0;
+
+ ldlm_lock(obddev);
+
+ list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) {
+ struct ldlm_namespace *ns;
+ ns = list_entry(tmp, struct ldlm_namespace, ns_link);
+
+ rc |= do_free_namespace(ns);
+ }
+
+ ldlm_unlock(obddev);
+
+ return rc;
+}
+
static int ldlm_cleanup(struct obd_device *obddev)
{
struct ldlm_obd *ldlm = &obddev->u.ldlm;
CERROR("Request list not empty!\n");
}
- rpc_unregister_service(ldlm->ldlm_service);
OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
+ if (ldlm_free_all(obddev)) {
+ CERROR("ldlm_free_all could not complete.\n");
+ RETURN(-1);
+ }
+
MOD_DEC_USE_COUNT;
RETURN(0);
}
struct list_head *tmp;
struct ldlm_namespace *res;
- ldlm_lock(obddev);
-
res = NULL;
list_for_each(tmp, &obddev->u.ldlm.ldlm_namespaces) {
struct ldlm_namespace *chk;
}
}
- ldlm_unlock(obddev);
-
return res;
}
if (!res_hash)
LBUG();
- for (bucket = res_hash + RES_HASH_SIZE-1 ; bucket >= res_hash ;
- bucket--) {
+ for (bucket = res_hash + RES_HASH_SIZE - 1; bucket >= res_hash;
+ bucket--)
INIT_LIST_HEAD(bucket);
- }
ns->ns_hash = res_hash;
}
{
struct ldlm_namespace *ns;
- ldlm_lock(obddev);
-
if (ldlm_namespace_find(obddev, id))
LBUG();
list_add(&ns->ns_link, &obddev->u.ldlm.ldlm_namespaces);
res_hash_init(ns);
-
- ldlm_unlock(obddev);
+ atomic_set(&ns->ns_refcount, 0);
return ns;
}
+int ldlm_namespace_free(struct ldlm_namespace *ns)
+{
+ if (atomic_read(&ns->ns_refcount))
+ return -EBUSY;
+
+ list_del(&ns->ns_link);
+ OBD_FREE(ns->ns_hash, sizeof(struct list_head) * RES_HASH_SIZE);
+ OBD_FREE(ns, sizeof(*ns));
+
+ return 0;
+}
+
static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name)
{
__u32 hash = 0;
int i;
- for (i = 0; i < RES_NAME_SIZE; i++) {
+ for (i = 0; i < RES_NAME_SIZE; i++)
hash += name[i];
- }
hash += (__u32)((unsigned long)parent >> 4);
memset(res, 0, sizeof(*res));
INIT_LIST_HEAD(&res->lr_children);
+ INIT_LIST_HEAD(&res->lr_childof);
INIT_LIST_HEAD(&res->lr_granted);
INIT_LIST_HEAD(&res->lr_converting);
INIT_LIST_HEAD(&res->lr_waiting);
/* ldlm_lock(obddev) must be taken before calling resource_add */
static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
- __u64 *name)
+ __u64 *name, __u32 type)
{
struct list_head *bucket;
struct ldlm_resource *res;
memcpy(res->lr_name, name, RES_NAME_SIZE * sizeof(__u32));
res->lr_namespace = ns;
+ if (type < 0 || type > LDLM_MAX_TYPE)
+ LBUG();
+
+ res->lr_type = type;
+ res->lr_most_restr = LCK_NL;
list_add(&res->lr_hash, bucket);
+ atomic_inc(&ns->ns_refcount);
if (parent == NULL) {
res->lr_parent = res;
list_add(&res->lr_rootlink, &ns->ns_root_list);
struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
- __u64 *name, int create)
+ __u64 *name, __u32 type, int create)
{
struct list_head *bucket;
struct list_head *tmp = bucket;
RETURN(NULL);
bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
- ldlm_lock(ns->ns_obddev);
-
res = NULL;
list_for_each(tmp, bucket) {
struct ldlm_resource *chk;
}
if (res == NULL && create)
- res = ldlm_resource_add(ns, parent, name);
-
- ldlm_unlock(ns->ns_obddev);
+ res = ldlm_resource_add(ns, parent, name, type);
RETURN(res);
}
int ldlm_resource_put(struct ldlm_resource *res)
{
int rc = 0;
- ldlm_lock(res->lr_namespace->ns_obddev);
+
+ if (atomic_read(&res->lr_refcount) <= 0)
+ LBUG();
if (atomic_dec_and_test(&res->lr_refcount)) {
if (!list_empty(&res->lr_granted))
if (!list_empty(&res->lr_waiting))
LBUG();
+ atomic_dec(&res->lr_namespace->ns_refcount);
list_del(&res->lr_hash);
list_del(&res->lr_rootlink);
list_del(&res->lr_childof);
kmem_cache_free(ldlm_resource_slab, res);
rc = 1;
}
- ldlm_unlock(res->lr_namespace->ns_obddev);
+
return rc;
}
+void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
+ struct ldlm_lock *lock)
+{
+ list_add(&lock->l_res_link, head);
+ atomic_inc(&res->lr_refcount);
+}
+
+void ldlm_resource_del_lock(struct ldlm_lock *lock)
+{
+ list_del(&lock->l_res_link);
+ atomic_dec(&lock->l_resource->lr_refcount);
+}
+
int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h)
{
LBUG();
#include <linux/lustre_dlm.h>
-static void ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new)
+static int ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
+ void *data, __u32 data_len)
{
printk("ldlm_test_callback: lock=%p, new=%p\n", lock, new);
+ return 0;
}
-int ldlm_test(struct obd_device *obddev)
+int ldlm_test_basics(struct obd_device *obddev)
{
struct ldlm_namespace *ns;
struct ldlm_resource *res;
- __u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
+ __u64 res_id[RES_NAME_SIZE] = {1, 4, 3};
ldlm_error_t err;
struct ldlm_handle h;
int flags;
+ ldlm_lock(obddev);
+
ns = ldlm_namespace_new(obddev, 1);
if (ns == NULL)
LBUG();
- res = ldlm_resource_get(ns, NULL, res_id, 1);
+ res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1);
if (res == NULL)
LBUG();
- res->lr_blocking = ldlm_test_callback;
-
/* Get a couple of read locks */
- err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR,
- &flags, NULL, NULL, 0, NULL, &h);
+ flags = LDLM_FL_BLOCKING_AST;
+ err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
+ LCK_CR, &flags, NULL, ldlm_test_callback,
+ NULL, 0, &h);
if (err != ELDLM_OK)
LBUG();
- err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR,
- &flags, NULL, NULL, 0, NULL, &h);
+ err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
+ LCK_EX, &flags, NULL, ldlm_test_callback,
+ NULL, 0, &h);
+ if (err != -ELDLM_BLOCK_GRANTED)
+ LBUG();
+
+ ldlm_resource_dump(res);
+
+ ldlm_unlock(obddev);
+
+ return 0;
+}
+
+int ldlm_test_extents(struct obd_device *obddev)
+{
+ struct ldlm_namespace *ns;
+ struct ldlm_resource *res;
+ __u64 file_res_id[RES_NAME_SIZE] = {0, 0, 0};
+ __u64 res_ext1[RES_NAME_SIZE] = {4, 6, 3};
+ __u64 res_ext2[RES_NAME_SIZE] = {6, 9, 3};
+ __u64 res_ext3[RES_NAME_SIZE] = {10, 11, 3};
+ ldlm_error_t err;
+ struct ldlm_handle file_h, ext1_h, ext2_h, ext3_h;
+ int flags;
+
+ ldlm_lock(obddev);
+
+ ns = ldlm_namespace_new(obddev, 2);
+ if (ns == NULL)
+ LBUG();
+
+ err = ldlm_local_lock_enqueue(obddev, 1, NULL, file_res_id, LDLM_EXTENT,
+ LCK_NL, &flags, NULL, NULL, NULL, 0,
+ &file_h);
+ if (err != ELDLM_OK)
+ LBUG();
+
+ flags = 0;
+ err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext1, LDLM_EXTENT,
+ LCK_PR, &flags, NULL, NULL, NULL, 0,
+ &ext1_h);
if (err != ELDLM_OK)
LBUG();
+ if (!(flags & LDLM_FL_RES_CHANGED))
+ LBUG();
+ flags = 0;
+ err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext2, LDLM_EXTENT,
+ LCK_PR, &flags, NULL, NULL, NULL, 0,
+ &ext2_h);
+ if (err != ELDLM_OK)
+ LBUG();
+ if (!(flags & LDLM_FL_RES_CHANGED))
+ LBUG();
+
+ flags = 0;
+ err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext3, LDLM_EXTENT,
+ LCK_EX, &flags, NULL, NULL, NULL, 0,
+ &ext3_h);
+ if (err != -ELDLM_BLOCK_GRANTED)
+ LBUG();
+ if (flags & LDLM_FL_RES_CHANGED)
+ LBUG();
+
+ /* Convert/cancel blocking locks */
+ flags = 0;
+ err = ldlm_local_lock_convert(obddev, &ext1_h, LCK_NL, &flags);
+ if (err != ELDLM_OK)
+ LBUG();
+
+ flags = 0;
+ err = ldlm_local_lock_cancel(obddev, &ext1_h);
+ if (err != ELDLM_OK)
+ LBUG();
+
+ /* Dump the results */
+ res = ldlm_resource_get(ns, NULL, file_res_id, LDLM_EXTENT, 0);
+ if (res == NULL)
+ LBUG();
+ ldlm_resource_dump(res);
+ res = ldlm_resource_get(ns, NULL, res_ext1, LDLM_EXTENT, 0);
+ if (res == NULL)
+ LBUG();
+ ldlm_resource_dump(res);
+ res = ldlm_resource_get(ns, NULL, res_ext2, LDLM_EXTENT, 0);
+ if (res == NULL)
+ LBUG();
ldlm_resource_dump(res);
+ ldlm_unlock(obddev);
+
return 0;
}
+
+int ldlm_test(struct obd_device *obddev)
+{
+ int rc;
+ rc = ldlm_test_basics(obddev);
+ if (rc)
+ RETURN(rc);
+
+ rc = ldlm_test_extents(obddev);
+ RETURN(rc);
+}
CERROR("Request list not empty!\n");
}
- rpc_unregister_service(mds->mds_service);
OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
sb = mds->mds_sb;
spin_lock(&svc->srv_lock);
if (sigismember(&(current->pending.signal), SIGKILL) ||
- sigismember(&(current->pending.signal), SIGSTOP) ||
- sigismember(&(current->pending.signal), SIGCONT) ||
+ sigismember(&(current->pending.signal), SIGTERM) ||
sigismember(&(current->pending.signal), SIGINT)) {
svc->srv_flags |= SVC_KILLED;
- EXIT;
- rc = 1;
- goto out;
+ GOTO(out, rc = 1);
}
- if ( svc->srv_flags & SVC_STOPPING ) {
- EXIT;
- rc = 1;
- goto out;
- }
+ if (svc->srv_flags & SVC_STOPPING)
+ GOTO(out, rc = 1);
if (svc->srv_flags & SVC_EVENT)
LBUG();
- if ( svc->srv_eq_h ) {
+ if (svc->srv_eq_h) {
int err;
err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
if (err == PTL_OK) {
svc->srv_flags |= SVC_EVENT;
- EXIT;
- rc = 1;
- goto out;
+ GOTO(out, rc = 1);
}
if (err != PTL_EQ_EMPTY) {
LBUG();
}
- EXIT;
- rc = 0;
- goto out;
+ GOTO(out, rc = 0);
}
if (!list_empty(&svc->srv_reqs)) {
svc->srv_flags |= SVC_LIST;
- EXIT;
- rc = 1;
- goto out;
+ GOTO(out, rc = 1);
}
EXIT;
spin_lock(&svc->srv_lock);
if (svc->srv_flags & SVC_SIGNAL) {
- EXIT;
spin_unlock(&svc->srv_lock);
+ EXIT;
break;
}
if (svc->srv_flags & SVC_STOPPING) {
- EXIT;
spin_unlock(&svc->srv_lock);
+ EXIT;
break;
}
}
CERROR("unknown break in service");
spin_unlock(&svc->srv_lock);
+ EXIT;
break;
}
init_waitqueue_head(&svc->srv_ctl_waitq);
rc = kernel_thread(ptlrpc_main, (void *) &d,
CLONE_VM | CLONE_FS | CLONE_FILES);
- if (rc < 0) {
+ if (rc < 0) {
CERROR("cannot start thread\n");
- return -EINVAL;
+ RETURN(-EINVAL);
}
wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
- EXIT;
- return 0;
+ RETURN(0);
}
-
-
int rpc_unregister_service(struct ptlrpc_service *service)
{
int rc, i;
insmod $SRCDIR/../../obd/class/obdclass.o || exit -1
insmod $SRCDIR/../../obd/rpc/ptlrpc.o || exit -1
+ insmod $SRCDIR/../../obd/ldlm/ldlm.o || exit -1
insmod $SRCDIR/../../obd/ext2obd/obdext2.o || exit -1
insmod $SRCDIR/../../obd/ost/ost.o || exit -1
insmod $SRCDIR/../../obd/osc/osc.o || exit -1
SRCDIR="`dirname $0`"
. $SRCDIR/common.sh
-setup_ldlm
+NETWORK=tcp
+LOCALHOST=localhost
+SERVER=localhost
+PORT=1234
-mknod /dev/obd c 10 241
-echo 0xffffffff > /proc/sys/portals/debug
+setup
+setup_portals
$OBDCTL <<EOF
device 0
rmmod osc
rmmod ost
rmmod obdext2
+rmmod ldlm
rmmod ptlrpc
rmmod obdclass