From a0542a124bc08a845ff8dc25fa9aa31459ff994a Mon Sep 17 00:00:00 2001 From: braam Date: Fri, 29 Mar 2002 15:19:40 +0000 Subject: [PATCH] - add more infrastructure to handle extents and debug. - some of this is going to change dramatically as we feel we are handling too many resources now. --- lustre/include/linux/lustre_dlm.h | 31 ++++-- lustre/include/linux/lustre_idl.h | 1 + lustre/ldlm/ldlm_extent.c | 38 ++++--- lustre/ldlm/ldlm_lock.c | 229 ++++++++++++++++++++++---------------- lustre/ldlm/ldlm_lockd.c | 62 +++++++++-- lustre/ldlm/ldlm_resource.c | 66 +++++++---- lustre/ldlm/ldlm_test.c | 119 ++++++++++++++++++-- lustre/mds/handler.c | 1 - lustre/ptlrpc/service.c | 42 +++---- lustre/tests/common.sh | 1 + lustre/tests/lldlm.sh | 9 +- lustre/tests/llmountcleanup.sh | 1 + 12 files changed, 407 insertions(+), 193 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 96cef52..39c6eb1 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -78,6 +78,7 @@ struct ldlm_namespace { struct list_head ns_link; /* in the list of ns's */ __u32 ns_id; /* identifier of ns */ struct list_head *ns_hash; /* hash table for ns */ + atomic_t ns_refcount; /* count of resources in the hash */ struct list_head ns_root_list; /* all root resources in ns */ struct obd_device *ns_obddev; }; @@ -95,7 +96,7 @@ struct ldlm_namespace { struct ldlm_lock; typedef int (*ldlm_lock_callback)(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data); + void *data, __u32 data_len); struct ldlm_lock { struct ldlm_resource *l_resource; @@ -108,6 +109,8 @@ struct ldlm_lock { ldlm_lock_callback l_completion_ast; ldlm_lock_callback l_blocking_ast; struct lustre_peer *l_peer; + void *l_data; + __u32 l_data_len; //void *l_event; //XXX cluster_host l_holder; __u32 l_version[RES_VERSION_SIZE]; @@ -122,6 +125,7 @@ typedef int (*ldlm_res_policy)(struct ldlm_resource *parent, __u64 *res_id_in, #define LDLM_EXTENT 0x1 #define LDLM_MDSINTENT 0x2 +#define LDLM_MAX_TYPE 0x2 extern ldlm_res_compat ldlm_res_compat_table []; extern ldlm_res_policy ldlm_res_policy_table []; @@ -144,8 +148,6 @@ struct ldlm_resource { __u32 lr_version[RES_VERSION_SIZE]; __u32 lr_type; /* PLAIN, EXTENT, or MDSINTENT */ spinlock_t lr_lock; - - void (*lr_blocking)(struct ldlm_lock *lock, struct ldlm_lock *new); }; struct ldlm_extent { @@ -160,7 +162,9 @@ static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res) static inline void *ldlm_handle2object(struct ldlm_handle *handle) { - return (void *)(unsigned long)(handle->addr); + if (handle) + return (void *)(unsigned long)(handle->addr); + return NULL; } static inline void ldlm_object2handle(void *object, struct ldlm_handle *handle) @@ -188,29 +192,38 @@ int ldlm_extent_policy(struct ldlm_resource *, __u64 *, __u64 *, /* ldlm_lock.c */ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, - struct ldlm_handle *parent_res_handle, struct ldlm_handle *parent_lock_handle, __u64 *res_id, + __u32 type, ldlm_mode_t mode, int *flags, ldlm_lock_callback completion, ldlm_lock_callback blocking, - __u32 data_len, void *data, + __u32 data_len, struct ldlm_handle *lockh); +ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev, + struct ldlm_handle *lockh, + int new_mode, int *flags); +ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, + struct ldlm_handle *lockh); void ldlm_lock_dump(struct ldlm_lock *lock); /* ldlm_test.c */ int ldlm_test(struct obd_device *device); /* resource.c */ -struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id); -struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id); +struct ldlm_namespace *ldlm_namespace_find(struct obd_device *, __u32 id); +struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 id); +int ldlm_namespace_free(struct ldlm_namespace *ns); void ldlm_resource_dump(struct ldlm_resource *res); struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, - __u64 *name, int create); + __u64 *name, __u32 type, int create); int ldlm_resource_put(struct ldlm_resource *res); +void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, + struct ldlm_lock *lock); +void ldlm_resource_del_lock(struct ldlm_lock *lock); #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index d4e5941..d036a5a 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -370,6 +370,7 @@ struct ldlm_handle { struct ldlm_request { __u32 ns_id; __u64 res_id[RES_NAME_SIZE]; + __u32 res_type; __u32 flags; struct ldlm_handle parent_res_handle; struct ldlm_handle parent_lock_handle; diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 3e1724d..2f92a1e 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -34,12 +34,12 @@ int ldlm_extent_compat(struct ldlm_resource *child, struct ldlm_resource *new) { struct ldlm_extent *child_ex, *new_ex; - child_ex = ldlm_res2extent(child); - new_ex = ldlm_res2extent(new); + child_ex = ldlm_res2extent(child); + new_ex = ldlm_res2extent(new); - if (MAX(child_ex->start, new_ex->start) > - MIN(child_ex->end, new_ex->end)) - return 0; + if (MAX(child_ex->start, new_ex->start) <= + MIN(child_ex->end, new_ex->end)) + return 0; return 1; } @@ -53,11 +53,12 @@ int ldlm_extent_compat(struct ldlm_resource *child, struct ldlm_resource *new) * * To reconstruct our formulas, take a deep breath. */ int ldlm_extent_policy(struct ldlm_resource *parent, - __u64 *res_id_in, __u64 *res_id_out, - ldlm_mode_t mode, void *data) + __u64 *res_id_in, __u64 *res_id_out, + ldlm_mode_t mode, void *data) { struct ldlm_extent *new_ex, *req_ex; struct list_head *tmp; + int rc = 0; req_ex = (struct ldlm_extent *)res_id_in; @@ -67,27 +68,32 @@ int ldlm_extent_policy(struct ldlm_resource *parent, list_for_each(tmp, &parent->lr_children) { struct ldlm_resource *res; - struct ldlm_extent *res_ex; + struct ldlm_extent *exist_ex; res = list_entry(tmp, struct ldlm_resource, lr_childof); - res_ex = ldlm_res2extent(res); + exist_ex = ldlm_res2extent(res); - if (res_ex->end < req_ex->start) - new_ex->start = MIN(res_ex->end, new_ex->start); + if (exist_ex->end < req_ex->start) + new_ex->start = MIN(exist_ex->end, new_ex->start); else { - if (res_ex->start < req_ex->start) + if (exist_ex->start < req_ex->start && + !lockmode_compat(res->lr_most_restr, mode)) /* Policy: minimize conflict overlap */ new_ex->start = req_ex->start; } - if (res_ex->start > req_ex->end) - new_ex->end = MAX(res_ex->start, new_ex->end); + if (exist_ex->start > req_ex->end) + new_ex->end = MAX(exist_ex->start, new_ex->end); else { - if (res_ex->end > req_ex->end) + if (exist_ex->end > req_ex->end && + !lockmode_compat(res->lr_most_restr, mode)) /* Policy: minimize conflict overlap */ new_ex->end = req_ex->end; } } - return 0; + if (new_ex->end != req_ex->end || new_ex->start != req_ex->start) + rc = ELDLM_RES_CHANGED; + + return rc; } diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index be1cba0..34a861d 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -45,6 +45,9 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, { struct ldlm_lock *lock; + if (resource == NULL) + LBUG(); + lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL); if (lock == NULL) return NULL; @@ -76,73 +79,91 @@ static int ldlm_notify_incompatible(struct list_head *list, rc = 1; - if (lock->l_resource->lr_blocking != NULL) - lock->l_resource->lr_blocking(lock, new); + if (lock->l_blocking_ast != NULL) + lock->l_blocking_ast(lock, new, lock->l_data, + lock->l_data_len); } return rc; } - -static int ldlm_reprocess_queue(struct list_head *queue, - struct list_head *granted_list) +static int ldlm_lock_compat(struct ldlm_lock *lock) { - struct list_head *tmp1, *tmp2; - struct ldlm_resource *res; - int rc = 0; + struct ldlm_resource *parent_res = lock->l_resource->lr_parent; + ldlm_res_compat compat; - list_for_each(tmp1, queue) { - struct ldlm_lock *pending; - rc = 0; - pending = list_entry(tmp1, struct ldlm_lock, l_res_link); - - /* check if pending can go in ... */ - list_for_each(tmp2, granted_list) { - struct ldlm_lock *lock; - lock = list_entry(tmp2, struct ldlm_lock, l_res_link); - if (lockmode_compat(lock->l_granted_mode, - pending->l_req_mode)) + if (parent_res && + (compat = ldlm_res_compat_table[parent_res->lr_type])) { + struct list_head *tmp; + int incompat = 0; + list_for_each(tmp, &parent_res->lr_children) { + struct ldlm_resource *child; + child = list_entry(tmp, struct ldlm_resource, + lr_childof); + + /* compat will return 0 when child == l_resource + * hence notifications on the same resource are incl. */ + if (compat(child, lock->l_resource)) continue; - else { - /* no, we are done */ - rc = 1; - break; - } - } - if (rc) { - /* no - we are done */ - break; + incompat |= ldlm_notify_incompatible(&child->lr_granted, + lock); } - res = pending->l_resource; - list_del(&pending->l_res_link); - list_add(&pending->l_res_link, &res->lr_granted); - pending->l_granted_mode = pending->l_req_mode; + return incompat; + } + + return ldlm_notify_incompatible(&lock->l_resource->lr_granted, lock); +} + +static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) +{ + ldlm_resource_add_lock(res, &res->lr_granted, lock); + lock->l_granted_mode = lock->l_req_mode; + + if (lock->l_granted_mode < res->lr_most_restr) + res->lr_most_restr = lock->l_granted_mode; + + if (lock->l_completion_ast) + lock->l_completion_ast(lock, NULL, NULL, 0); +} - if (pending->l_granted_mode < res->lr_most_restr) - res->lr_most_restr = pending->l_granted_mode; +static int ldlm_reprocess_queue(struct ldlm_lock *lock, + struct list_head *converting, + struct list_head *granted_list) +{ + struct list_head *tmp, *pos; + int incompat = 0; - if (pending->l_completion_ast) - pending->l_completion_ast(pending, NULL, NULL); - + list_for_each_safe(tmp, pos, converting) { + struct ldlm_lock *pending; + pending = list_entry(tmp, struct ldlm_lock, l_res_link); + + incompat = ldlm_lock_compat(pending); + if (incompat) + break; + list_del(&pending->l_res_link); + ldlm_grant_lock(pending->l_resource, pending); } - return rc; + return incompat; } +/* XXX: Revisit the error handling; we do not, for example, do + * ldlm_resource_put()s in our error cases, and we probably leak an allocated + * memory. */ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, - struct ldlm_handle *parent_res_handle, struct ldlm_handle *parent_lock_handle, __u64 *res_id, + __u32 type, ldlm_mode_t mode, int *flags, ldlm_lock_callback completion, ldlm_lock_callback blocking, - __u32 data_len, void *data, + __u32 data_len, struct ldlm_handle *lockh) { struct ldlm_namespace *ns; @@ -150,13 +171,15 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, struct ldlm_lock *lock, *parent_lock; int incompat = 0, rc; __u64 new_id[RES_NAME_SIZE]; - ldlm_res_compat compat; ldlm_res_policy policy; ENTRY; - - parent_res = ldlm_handle2object(parent_res_handle); + parent_lock = ldlm_handle2object(parent_lock_handle); + if ( parent_lock ) + parent_res = parent_lock->l_resource; + else + parent_res = NULL; ns = ldlm_namespace_find(obddev, ns_id); if (ns == NULL || ns->ns_hash == NULL) @@ -171,7 +194,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, } } - res = ldlm_resource_get(ns, parent_res, res_id, 1); + res = ldlm_resource_get(ns, parent_res, res_id, type, 1); if (res == NULL) RETURN(-ENOMEM); @@ -179,6 +202,8 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, if (lock == NULL) RETURN(-ENOMEM); + lock->l_data = data; + lock->l_data_len = data_len; if ((*flags) & LDLM_FL_COMPLETION_AST) lock->l_completion_ast = completion; if ((*flags) & LDLM_FL_BLOCKING_AST) @@ -189,53 +214,69 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, /* FIXME: We may want to optimize by checking lr_most_restr */ if (!list_empty(&res->lr_converting)) { - list_add(&lock->l_res_link, res->lr_waiting.prev); - rc = -ELDLM_BLOCK_CONV; - GOTO(out, rc); + ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); + GOTO(out, rc = -ELDLM_BLOCK_CONV); } if (!list_empty(&res->lr_waiting)) { - list_add(&lock->l_res_link, res->lr_waiting.prev); - rc = -ELDLM_BLOCK_WAIT; - GOTO(out, rc); + ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); + GOTO(out, rc = -ELDLM_BLOCK_WAIT); } - if (parent_res && - (compat = ldlm_res_compat_table[parent_res->lr_type])) { - struct list_head *tmp; - list_for_each(tmp, &parent_res->lr_children) { - struct ldlm_resource *child; - child = list_entry(tmp, struct ldlm_resource, - lr_childof); + incompat = ldlm_lock_compat(lock); + if (incompat) { + ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); + GOTO(out, rc = -ELDLM_BLOCK_GRANTED); + } - if (compat(child, res)) - continue; + ldlm_grant_lock(res, lock); + GOTO(out, rc = ELDLM_OK); - incompat |= ldlm_notify_incompatible(&child->lr_granted, - lock); - } - } else - incompat = ldlm_notify_incompatible(&res->lr_granted, lock); + out: + spin_unlock(&res->lr_lock); + return rc; +} - if (incompat) { - list_add(&lock->l_res_link, res->lr_waiting.prev); - rc = -ELDLM_BLOCK_GRANTED; - GOTO(out, rc); +static void ldlm_reprocess_res_compat(struct ldlm_lock *lock) +{ + struct ldlm_resource *parent_res = lock->l_resource->lr_parent; + struct list_head *tmp; + int do_waiting; + + list_for_each(tmp, &parent_res->lr_children) { + struct ldlm_resource *child; + child = list_entry(tmp, struct ldlm_resource, lr_childof); + + ldlm_reprocess_queue(lock, &child->lr_converting, + &child->lr_granted); + if (!list_empty(&child->lr_converting)) + do_waiting = 0; } - list_add(&lock->l_res_link, &res->lr_granted); - lock->l_granted_mode = mode; - if (mode < res->lr_most_restr) - res->lr_most_restr = mode; + if (!do_waiting) + return; - if (lock->l_completion_ast) - lock->l_completion_ast(lock, NULL, NULL); + list_for_each(tmp, &parent_res->lr_children) { + struct ldlm_resource *child; + child = list_entry(tmp, struct ldlm_resource, lr_childof); - rc = ELDLM_OK; - GOTO(out, rc); + ldlm_reprocess_queue(lock, &child->lr_waiting, + &child->lr_granted); + } +} - out: - spin_unlock(&res->lr_lock); - return rc; +static void ldlm_reprocess_all(struct ldlm_lock *lock) +{ + struct ldlm_resource *res = lock->l_resource; + struct ldlm_resource *parent_res = res->lr_parent; + + if (parent_res && ldlm_res_compat_table[parent_res->lr_type]) { + ldlm_reprocess_res_compat(lock); + return; + } + + ldlm_reprocess_queue(lock, &res->lr_converting, &res->lr_granted); + if (list_empty(&res->lr_converting)) + ldlm_reprocess_queue(lock, &res->lr_waiting, &res->lr_granted); } ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, @@ -245,43 +286,37 @@ ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, struct ldlm_resource *res; ENTRY; - lock = (struct ldlm_lock *)(unsigned long)lockh->addr; + lock = ldlm_handle2object(lockh); res = lock->l_resource; - list_del(&lock->l_res_link); - kmem_cache_free(ldlm_lock_slab, lock); - if (ldlm_resource_put(res)) { - EXIT; - return 0; - } + ldlm_resource_del_lock(lock); - ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted); - if (list_empty(&res->lr_converting)) - ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted); + kmem_cache_free(ldlm_lock_slab, lock); + if (ldlm_resource_put(res)) + RETURN(ELDLM_OK); + ldlm_reprocess_all(lock); - return 0; + RETURN(ELDLM_OK); } ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev, struct ldlm_handle *lockh, - int new_mode, int flags) + int new_mode, int *flags) { struct ldlm_lock *lock; struct ldlm_resource *res; ENTRY; - lock = (struct ldlm_lock *)(unsigned long)lockh->addr; + lock = ldlm_handle2object(lockh); res = lock->l_resource; list_del(&lock->l_res_link); lock->l_req_mode = new_mode; - list_add(&lock->l_res_link, &res->lr_converting); + list_add(&lock->l_res_link, res->lr_converting.prev); - ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted); - if (list_empty(&res->lr_converting)) - ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted); + ldlm_reprocess_all(lock); - return 0; + RETURN(ELDLM_OK); } void ldlm_lock_dump(struct ldlm_lock *lock) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index fbc74ca..2628812 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -26,7 +26,7 @@ extern kmem_cache_t *ldlm_resource_slab; extern kmem_cache_t *ldlm_lock_slab; static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data) + void *data, __u32 data_len) { LBUG(); return 0; @@ -55,14 +55,17 @@ static int ldlm_enqueue(struct ptlrpc_request *req) msg->xid = req_msg->xid; - err = ldlm_local_lock_enqueue(req->rq_obd, dlm_req->ns_id, - &dlm_req->parent_res_handle, + err = ldlm_local_lock_enqueue(req->rq_obd, + dlm_req->ns_id, &dlm_req->parent_lock_handle, - dlm_req->res_id, dlm_req->mode, - &dlm_req->flags, ldlm_client_callback, + dlm_req->res_id, + dlm_req->res_type, + dlm_req->mode, + &dlm_req->flags, + ldlm_client_callback, ldlm_client_callback, - req_msg->buflens[1], lustre_msg_buf(1, req_msg), + req_msg->buflens[1], &dlm_rep->lock_handle); msg->status = HTON__u32(err); @@ -190,6 +193,8 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, "self", ldlm_handle); + if (!ldlm->ldlm_service) + LBUG(); err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm"); if (err) @@ -199,6 +204,45 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) RETURN(0); } +static int do_free_namespace(struct ldlm_namespace *ns) +{ + struct list_head *tmp, *pos; + int i, rc; + + for (i = 0; i < RES_HASH_SIZE; i++) { + list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { + struct ldlm_resource *res; + res = list_entry(tmp, struct ldlm_resource, lr_hash); + list_del_init(&res->lr_hash); + + rc = 0; + while (rc == 0) + rc = ldlm_resource_put(res); + } + } + + return ldlm_namespace_free(ns); +} + +static int ldlm_free_all(struct obd_device *obddev) +{ + struct list_head *tmp, *pos; + int rc = 0; + + ldlm_lock(obddev); + + list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) { + struct ldlm_namespace *ns; + ns = list_entry(tmp, struct ldlm_namespace, ns_link); + + rc |= do_free_namespace(ns); + } + + ldlm_unlock(obddev); + + return rc; +} + static int ldlm_cleanup(struct obd_device *obddev) { struct ldlm_obd *ldlm = &obddev->u.ldlm; @@ -212,9 +256,13 @@ static int ldlm_cleanup(struct obd_device *obddev) CERROR("Request list not empty!\n"); } - rpc_unregister_service(ldlm->ldlm_service); OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service)); + if (ldlm_free_all(obddev)) { + CERROR("ldlm_free_all could not complete.\n"); + RETURN(-1); + } + MOD_DEC_USE_COUNT; RETURN(0); } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index e00baa9..db33639 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -31,8 +31,6 @@ struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id) struct list_head *tmp; struct ldlm_namespace *res; - ldlm_lock(obddev); - res = NULL; list_for_each(tmp, &obddev->u.ldlm.ldlm_namespaces) { struct ldlm_namespace *chk; @@ -44,8 +42,6 @@ struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id) } } - ldlm_unlock(obddev); - return res; } @@ -62,10 +58,9 @@ static void res_hash_init(struct ldlm_namespace *ns) if (!res_hash) LBUG(); - for (bucket = res_hash + RES_HASH_SIZE-1 ; bucket >= res_hash ; - bucket--) { + for (bucket = res_hash + RES_HASH_SIZE - 1; bucket >= res_hash; + bucket--) INIT_LIST_HEAD(bucket); - } ns->ns_hash = res_hash; } @@ -74,8 +69,6 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id) { struct ldlm_namespace *ns; - ldlm_lock(obddev); - if (ldlm_namespace_find(obddev, id)) LBUG(); @@ -89,20 +82,30 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id) list_add(&ns->ns_link, &obddev->u.ldlm.ldlm_namespaces); res_hash_init(ns); - - ldlm_unlock(obddev); + atomic_set(&ns->ns_refcount, 0); return ns; } +int ldlm_namespace_free(struct ldlm_namespace *ns) +{ + if (atomic_read(&ns->ns_refcount)) + return -EBUSY; + + list_del(&ns->ns_link); + OBD_FREE(ns->ns_hash, sizeof(struct list_head) * RES_HASH_SIZE); + OBD_FREE(ns, sizeof(*ns)); + + return 0; +} + static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name) { __u32 hash = 0; int i; - for (i = 0; i < RES_NAME_SIZE; i++) { + for (i = 0; i < RES_NAME_SIZE; i++) hash += name[i]; - } hash += (__u32)((unsigned long)parent >> 4); @@ -119,6 +122,7 @@ static struct ldlm_resource *ldlm_resource_new(void) memset(res, 0, sizeof(*res)); INIT_LIST_HEAD(&res->lr_children); + INIT_LIST_HEAD(&res->lr_childof); INIT_LIST_HEAD(&res->lr_granted); INIT_LIST_HEAD(&res->lr_converting); INIT_LIST_HEAD(&res->lr_waiting); @@ -133,7 +137,7 @@ static struct ldlm_resource *ldlm_resource_new(void) /* ldlm_lock(obddev) must be taken before calling resource_add */ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, - __u64 *name) + __u64 *name, __u32 type) { struct list_head *bucket; struct ldlm_resource *res; @@ -146,7 +150,13 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, memcpy(res->lr_name, name, RES_NAME_SIZE * sizeof(__u32)); res->lr_namespace = ns; + if (type < 0 || type > LDLM_MAX_TYPE) + LBUG(); + + res->lr_type = type; + res->lr_most_restr = LCK_NL; list_add(&res->lr_hash, bucket); + atomic_inc(&ns->ns_refcount); if (parent == NULL) { res->lr_parent = res; list_add(&res->lr_rootlink, &ns->ns_root_list); @@ -160,7 +170,7 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, - __u64 *name, int create) + __u64 *name, __u32 type, int create) { struct list_head *bucket; struct list_head *tmp = bucket; @@ -172,8 +182,6 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, RETURN(NULL); bucket = ns->ns_hash + ldlm_hash_fn(parent, name); - ldlm_lock(ns->ns_obddev); - res = NULL; list_for_each(tmp, bucket) { struct ldlm_resource *chk; @@ -188,9 +196,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, } if (res == NULL && create) - res = ldlm_resource_add(ns, parent, name); - - ldlm_unlock(ns->ns_obddev); + res = ldlm_resource_add(ns, parent, name, type); RETURN(res); } @@ -198,7 +204,9 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, int ldlm_resource_put(struct ldlm_resource *res) { int rc = 0; - ldlm_lock(res->lr_namespace->ns_obddev); + + if (atomic_read(&res->lr_refcount) <= 0) + LBUG(); if (atomic_dec_and_test(&res->lr_refcount)) { if (!list_empty(&res->lr_granted)) @@ -210,6 +218,7 @@ int ldlm_resource_put(struct ldlm_resource *res) if (!list_empty(&res->lr_waiting)) LBUG(); + atomic_dec(&res->lr_namespace->ns_refcount); list_del(&res->lr_hash); list_del(&res->lr_rootlink); list_del(&res->lr_childof); @@ -217,10 +226,23 @@ int ldlm_resource_put(struct ldlm_resource *res) kmem_cache_free(ldlm_resource_slab, res); rc = 1; } - ldlm_unlock(res->lr_namespace->ns_obddev); + return rc; } +void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, + struct ldlm_lock *lock) +{ + list_add(&lock->l_res_link, head); + atomic_inc(&res->lr_refcount); +} + +void ldlm_resource_del_lock(struct ldlm_lock *lock) +{ + list_del(&lock->l_res_link); + atomic_dec(&lock->l_resource->lr_refcount); +} + int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h) { LBUG(); diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index bef9aad..915b824 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -23,42 +23,141 @@ #include -static void ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new) +static int ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new, + void *data, __u32 data_len) { printk("ldlm_test_callback: lock=%p, new=%p\n", lock, new); + return 0; } -int ldlm_test(struct obd_device *obddev) +int ldlm_test_basics(struct obd_device *obddev) { struct ldlm_namespace *ns; struct ldlm_resource *res; - __u64 res_id[RES_NAME_SIZE] = {1, 2, 3}; + __u64 res_id[RES_NAME_SIZE] = {1, 4, 3}; ldlm_error_t err; struct ldlm_handle h; int flags; + ldlm_lock(obddev); + ns = ldlm_namespace_new(obddev, 1); if (ns == NULL) LBUG(); - res = ldlm_resource_get(ns, NULL, res_id, 1); + res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1); if (res == NULL) LBUG(); - res->lr_blocking = ldlm_test_callback; - /* Get a couple of read locks */ - err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR, - &flags, NULL, NULL, 0, NULL, &h); + flags = LDLM_FL_BLOCKING_AST; + err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN, + LCK_CR, &flags, NULL, ldlm_test_callback, + NULL, 0, &h); if (err != ELDLM_OK) LBUG(); - err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR, - &flags, NULL, NULL, 0, NULL, &h); + err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN, + LCK_EX, &flags, NULL, ldlm_test_callback, + NULL, 0, &h); + if (err != -ELDLM_BLOCK_GRANTED) + LBUG(); + + ldlm_resource_dump(res); + + ldlm_unlock(obddev); + + return 0; +} + +int ldlm_test_extents(struct obd_device *obddev) +{ + struct ldlm_namespace *ns; + struct ldlm_resource *res; + __u64 file_res_id[RES_NAME_SIZE] = {0, 0, 0}; + __u64 res_ext1[RES_NAME_SIZE] = {4, 6, 3}; + __u64 res_ext2[RES_NAME_SIZE] = {6, 9, 3}; + __u64 res_ext3[RES_NAME_SIZE] = {10, 11, 3}; + ldlm_error_t err; + struct ldlm_handle file_h, ext1_h, ext2_h, ext3_h; + int flags; + + ldlm_lock(obddev); + + ns = ldlm_namespace_new(obddev, 2); + if (ns == NULL) + LBUG(); + + err = ldlm_local_lock_enqueue(obddev, 1, NULL, file_res_id, LDLM_EXTENT, + LCK_NL, &flags, NULL, NULL, NULL, 0, + &file_h); + if (err != ELDLM_OK) + LBUG(); + + flags = 0; + err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext1, LDLM_EXTENT, + LCK_PR, &flags, NULL, NULL, NULL, 0, + &ext1_h); if (err != ELDLM_OK) LBUG(); + if (!(flags & LDLM_FL_RES_CHANGED)) + LBUG(); + flags = 0; + err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext2, LDLM_EXTENT, + LCK_PR, &flags, NULL, NULL, NULL, 0, + &ext2_h); + if (err != ELDLM_OK) + LBUG(); + if (!(flags & LDLM_FL_RES_CHANGED)) + LBUG(); + + flags = 0; + err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext3, LDLM_EXTENT, + LCK_EX, &flags, NULL, NULL, NULL, 0, + &ext3_h); + if (err != -ELDLM_BLOCK_GRANTED) + LBUG(); + if (flags & LDLM_FL_RES_CHANGED) + LBUG(); + + /* Convert/cancel blocking locks */ + flags = 0; + err = ldlm_local_lock_convert(obddev, &ext1_h, LCK_NL, &flags); + if (err != ELDLM_OK) + LBUG(); + + flags = 0; + err = ldlm_local_lock_cancel(obddev, &ext1_h); + if (err != ELDLM_OK) + LBUG(); + + /* Dump the results */ + res = ldlm_resource_get(ns, NULL, file_res_id, LDLM_EXTENT, 0); + if (res == NULL) + LBUG(); + ldlm_resource_dump(res); + res = ldlm_resource_get(ns, NULL, res_ext1, LDLM_EXTENT, 0); + if (res == NULL) + LBUG(); + ldlm_resource_dump(res); + res = ldlm_resource_get(ns, NULL, res_ext2, LDLM_EXTENT, 0); + if (res == NULL) + LBUG(); ldlm_resource_dump(res); + ldlm_unlock(obddev); + return 0; } + +int ldlm_test(struct obd_device *obddev) +{ + int rc; + rc = ldlm_test_basics(obddev); + if (rc) + RETURN(rc); + + rc = ldlm_test_extents(obddev); + RETURN(rc); +} diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 6aa6ab5..b9db2cb 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -536,7 +536,6 @@ static int mds_cleanup(struct obd_device * obddev) CERROR("Request list not empty!\n"); } - rpc_unregister_service(mds->mds_service); OBD_FREE(mds->mds_service, sizeof(*mds->mds_service)); sb = mds->mds_sb; diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 1612597..4ec408e 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -41,33 +41,25 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc) spin_lock(&svc->srv_lock); if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGSTOP) || - sigismember(&(current->pending.signal), SIGCONT) || + sigismember(&(current->pending.signal), SIGTERM) || sigismember(&(current->pending.signal), SIGINT)) { svc->srv_flags |= SVC_KILLED; - EXIT; - rc = 1; - goto out; + GOTO(out, rc = 1); } - if ( svc->srv_flags & SVC_STOPPING ) { - EXIT; - rc = 1; - goto out; - } + if (svc->srv_flags & SVC_STOPPING) + GOTO(out, rc = 1); if (svc->srv_flags & SVC_EVENT) LBUG(); - if ( svc->srv_eq_h ) { + if (svc->srv_eq_h) { int err; err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev); if (err == PTL_OK) { svc->srv_flags |= SVC_EVENT; - EXIT; - rc = 1; - goto out; + GOTO(out, rc = 1); } if (err != PTL_EQ_EMPTY) { @@ -75,16 +67,12 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc) LBUG(); } - EXIT; - rc = 0; - goto out; + GOTO(out, rc = 0); } if (!list_empty(&svc->srv_reqs)) { svc->srv_flags |= SVC_LIST; - EXIT; - rc = 1; - goto out; + GOTO(out, rc = 1); } EXIT; @@ -192,14 +180,14 @@ static int ptlrpc_main(void *arg) spin_lock(&svc->srv_lock); if (svc->srv_flags & SVC_SIGNAL) { - EXIT; spin_unlock(&svc->srv_lock); + EXIT; break; } if (svc->srv_flags & SVC_STOPPING) { - EXIT; spin_unlock(&svc->srv_lock); + EXIT; break; } @@ -246,6 +234,7 @@ static int ptlrpc_main(void *arg) } CERROR("unknown break in service"); spin_unlock(&svc->srv_lock); + EXIT; break; } @@ -281,18 +270,15 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, init_waitqueue_head(&svc->srv_ctl_waitq); rc = kernel_thread(ptlrpc_main, (void *) &d, CLONE_VM | CLONE_FS | CLONE_FILES); - if (rc < 0) { + if (rc < 0) { CERROR("cannot start thread\n"); - return -EINVAL; + RETURN(-EINVAL); } wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING); - EXIT; - return 0; + RETURN(0); } - - int rpc_unregister_service(struct ptlrpc_service *service) { int rc, i; diff --git a/lustre/tests/common.sh b/lustre/tests/common.sh index a7b0227..328dd76 100644 --- a/lustre/tests/common.sh +++ b/lustre/tests/common.sh @@ -63,6 +63,7 @@ setup() { insmod $SRCDIR/../../obd/class/obdclass.o || exit -1 insmod $SRCDIR/../../obd/rpc/ptlrpc.o || exit -1 + insmod $SRCDIR/../../obd/ldlm/ldlm.o || exit -1 insmod $SRCDIR/../../obd/ext2obd/obdext2.o || exit -1 insmod $SRCDIR/../../obd/ost/ost.o || exit -1 insmod $SRCDIR/../../obd/osc/osc.o || exit -1 diff --git a/lustre/tests/lldlm.sh b/lustre/tests/lldlm.sh index 1ab3d24..462d5df 100755 --- a/lustre/tests/lldlm.sh +++ b/lustre/tests/lldlm.sh @@ -3,10 +3,13 @@ SRCDIR="`dirname $0`" . $SRCDIR/common.sh -setup_ldlm +NETWORK=tcp +LOCALHOST=localhost +SERVER=localhost +PORT=1234 -mknod /dev/obd c 10 241 -echo 0xffffffff > /proc/sys/portals/debug +setup +setup_portals $OBDCTL <